From 89223c98d106dea0014e3c7679d2c0f24692384f Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 17 May 2024 07:51:15 +0100 Subject: [PATCH] Change dataset URI validation to raise warning instead of error in Airflow 2.9 (#39670) Closes: #39486 # Context Valid DAGs that worked in Airflow 2.8.x and had tasks with outlets with specific URIs, such as `Dataset("postgres://postgres:5432/postgres.dbt.stg_customers")`, stopped working in Airflow 2.9.0 & Airflow 2.9.1, after #37005 was merged. This was a breaking change in an Airflow minor version. We should avoid this. Airflow < 3.0 should raise a warning, and from Airflow 3.0, we can make errors by default. We can have a feature flag to allow users who want to see this in advance to enable errors in Airflow 2. x, but this should not be the default behaviour. The DAGs should continue working on Airflow 2.x minor/micro releases without errors (unless the user opts in via configuration). # How to reproduce By running the following DAG with `apache-airflow==2.9.1` and `apache-airflow-providers-postgres==5.11.0`, as an example: ``` from datetime import datetime from airflow import DAG from airflow.datasets import Dataset from airflow.operators.empty import EmptyOperator with DAG(dag_id='empty_operator_example', start_date=datetime(2022, 1, 1), schedule_interval=None) as dag: task1 = EmptyOperator( task_id='empty_task1', dag=dag, outlets=[Dataset("postgres://postgres:5432/postgres.dbt.stg_customers")] ) task2 = EmptyOperator( task_id='empty_task2', dag=dag ) task1 >> task2 ``` Causes to the exception: ``` Broken DAG: [/usr/local/airflow/dags/example_issue.py] Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 81, in _sanitize_uri parsed = normalizer(parsed) ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py", line 34, in sanitize_uri raise ValueError("URI format postgres:// must contain database, schema, and table names") ValueError: URI format postgres:// must contain database, schema, and table names ``` # About the changes introduced This PR introduces the following: 1. A boolean configuration within `[core],` named `strict_dataset_uri_validation,` which should be `False` by default. 2. When this configuration is `False,` Airflow should raise a warning saying: ``` From Airflow 3, Airflow will be more strict with Dataset URIs, and the URI xx will no longer be valid. Please, follow the expected standard as documented in XX. ``` 3. If this configuration is `True,` Airflow should raise the exception, as it does now in Airflow 2.9.0 and 2.9.1 4. From Airflow 3.0, we change this configuration to be `True` by default. --- airflow/config_templates/config.yml | 9 ++++++++ airflow/datasets/__init__.py | 16 ++++++++++++++- tests/datasets/test_dataset.py | 32 ++++++++++++++++++++++++++++- 3 files changed, 55 insertions(+), 2 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 36fb176e95d6c..1db38932e619b 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -464,6 +464,15 @@ core: sensitive: true default: ~ example: '{"some_param": "some_value"}' + strict_dataset_uri_validation: + description: | + Dataset URI validation should raise an exception if it is not compliant with AIP-60. + By default this configuration is false, meaning that Airflow 2.x only warns the user. + In Airflow 3, this configuration will be enabled by default. + default: "False" + example: ~ + version_added: 2.9.2 + type: boolean database_access_isolation: description: (experimental) Whether components should use Airflow Internal API for DB connectivity. version_added: 2.6.0 diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 37720314d3c66..d8012e68743dd 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -27,6 +27,9 @@ if TYPE_CHECKING: from urllib.parse import SplitResult + +from airflow.configuration import conf + __all__ = ["Dataset", "DatasetAll", "DatasetAny"] @@ -87,7 +90,18 @@ def _sanitize_uri(uri: str) -> str: fragment="", # Ignore any fragments. ) if (normalizer := _get_uri_normalizer(normalized_scheme)) is not None: - parsed = normalizer(parsed) + try: + parsed = normalizer(parsed) + except ValueError as exception: + if conf.getboolean("core", "strict_dataset_uri_validation", fallback=False): + raise exception + else: + warnings.warn( + f"The dataset URI {uri} is not AIP-60 compliant. " + f"In Airflow 3, this will raise an exception. More information: {repr(exception)}", + UserWarning, + stacklevel=3, + ) return urllib.parse.urlunsplit(parsed) diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 31aebff9fad5f..5453d971d9196 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -20,15 +20,17 @@ import os from collections import defaultdict from typing import Callable +from unittest.mock import patch import pytest from sqlalchemy.sql import select -from airflow.datasets import BaseDataset, Dataset, DatasetAll, DatasetAny +from airflow.datasets import BaseDataset, Dataset, DatasetAll, DatasetAny, _sanitize_uri from airflow.models.dataset import DatasetDagRunQueue, DatasetModel from airflow.models.serialized_dag import SerializedDagModel from airflow.operators.empty import EmptyOperator from airflow.serialization.serialized_objects import BaseSerialization, SerializedDAG +from tests.test_utils.config import conf_vars @pytest.fixture @@ -441,3 +443,31 @@ def test_datasets_expression_error(expression: Callable[[], None], error: str) - with pytest.raises(TypeError) as info: expression() assert str(info.value) == error + + +def mock_get_uri_normalizer(normalized_scheme): + def normalizer(uri): + raise ValueError("Incorrect URI format") + + return normalizer + + +@patch("airflow.datasets._get_uri_normalizer", mock_get_uri_normalizer) +@patch("airflow.datasets.warnings.warn") +def test__sanitize_uri_raises_warning(mock_warn): + _sanitize_uri("postgres://localhost:5432/database.schema.table") + msg = mock_warn.call_args.args[0] + assert "The dataset URI postgres://localhost:5432/database.schema.table is not AIP-60 compliant." in msg + assert ( + "In Airflow 3, this will raise an exception. More information: ValueError('Incorrect URI format')" + in msg + ) + + +@patch("airflow.datasets._get_uri_normalizer", mock_get_uri_normalizer) +@conf_vars({("core", "strict_dataset_uri_validation"): "True"}) +def test__sanitize_uri_raises_exception(): + with pytest.raises(ValueError) as e_info: + _sanitize_uri("postgres://localhost:5432/database.schema.table") + assert isinstance(e_info.value, ValueError) + assert str(e_info.value) == "Incorrect URI format"