diff --git a/airflow/models/dagparam.py b/airflow/models/dagparam.py deleted file mode 100644 index 78322a345695a..0000000000000 --- a/airflow/models/dagparam.py +++ /dev/null @@ -1,30 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""This module is deprecated. Please use :mod:`airflow.models.param`.""" - -from __future__ import annotations - -import warnings - -from airflow.exceptions import RemovedInAirflow3Warning -from airflow.models.param import DagParam # noqa: F401 - -warnings.warn( - "This module is deprecated. Please use `airflow.models.param`.", - RemovedInAirflow3Warning, - stacklevel=2, -) diff --git a/airflow/models/param.py b/airflow/models/param.py index b5c9b5c4c2a5c..a4bbce2b768c6 100644 --- a/airflow/models/param.py +++ b/airflow/models/param.py @@ -18,16 +18,11 @@ import contextlib import copy -import datetime import json import logging -import warnings from typing import TYPE_CHECKING, Any, ClassVar, ItemsView, Iterable, MutableMapping, ValuesView -from pendulum.parsing import parse_iso8601 - -from airflow.exceptions import AirflowException, ParamValidationError, RemovedInAirflow3Warning -from airflow.utils import timezone +from airflow.exceptions import AirflowException, ParamValidationError from airflow.utils.mixins import ResolveMixin from airflow.utils.types import NOTSET, ArgNotSet @@ -59,7 +54,7 @@ class Param: def __init__(self, default: Any = NOTSET, description: str | None = None, **kwargs): if default is not NOTSET: - self._warn_if_not_json(default) + self._check_json(default) self.value = default self.description = description self.schema = kwargs.pop("schema") if "schema" in kwargs else kwargs @@ -68,39 +63,14 @@ def __copy__(self) -> Param: return Param(self.value, self.description, schema=self.schema) @staticmethod - def _warn_if_not_json(value): + def _check_json(value): try: json.dumps(value) except Exception: - warnings.warn( - "The use of non-json-serializable params is deprecated and will be removed in " - "a future release", - RemovedInAirflow3Warning, - stacklevel=1, - ) - - @staticmethod - def _warn_if_not_rfc3339_dt(value): - """Fallback to iso8601 datetime validation if rfc3339 failed.""" - try: - iso8601_value = parse_iso8601(value) - except Exception: - return None - if not isinstance(iso8601_value, datetime.datetime): - return None - warnings.warn( - f"The use of non-RFC3339 datetime: {value!r} is deprecated " - "and will be removed in a future release", - RemovedInAirflow3Warning, - stacklevel=1, - ) - if timezone.is_naive(iso8601_value): - warnings.warn( - "The use naive datetime is deprecated and will be removed in a future release", - RemovedInAirflow3Warning, - stacklevel=1, + raise ParamValidationError( + "All provided parameters must be json-serializable. " + f"The value '{value}' is not serializable." ) - return value def resolve(self, value: Any = NOTSET, suppress_exception: bool = False) -> Any: """ @@ -120,7 +90,7 @@ def resolve(self, value: Any = NOTSET, suppress_exception: bool = False) -> Any: from jsonschema.exceptions import ValidationError if value is not NOTSET: - self._warn_if_not_json(value) + self._check_json(value) final_val = self.value if value is NOTSET else value if isinstance(final_val, ArgNotSet): if suppress_exception: @@ -129,11 +99,6 @@ def resolve(self, value: Any = NOTSET, suppress_exception: bool = False) -> Any: try: jsonschema.validate(final_val, self.schema, format_checker=FormatChecker()) except ValidationError as err: - if err.schema.get("format") == "date-time": - rfc3339_value = self._warn_if_not_rfc3339_dt(final_val) - if rfc3339_value: - self.value = rfc3339_value - return rfc3339_value if suppress_exception: return None raise ParamValidationError(err) from None diff --git a/newsfragments/41776.significant.rst b/newsfragments/41776.significant.rst new file mode 100644 index 0000000000000..62bc7986d2ba3 --- /dev/null +++ b/newsfragments/41776.significant.rst @@ -0,0 +1,5 @@ +Removed a set of deprecations in from ``airflow.models.param``. + +- Removed deprecated direct access to DagParam as module. Please import from ``airflow.models.param``. +- Ensure all param values are JSON serialiazable and raise a ``ParamValidationError`` if not. +- Ensure parsed date and time values are RFC3339 compliant. diff --git a/tests/models/test_param.py b/tests/models/test_param.py index 18d4c190ad28a..3d85a957ec5d9 100644 --- a/tests/models/test_param.py +++ b/tests/models/test_param.py @@ -21,7 +21,7 @@ import pytest from airflow.decorators import task -from airflow.exceptions import ParamValidationError, RemovedInAirflow3Warning +from airflow.exceptions import ParamValidationError from airflow.models.param import Param, ParamsDict from airflow.serialization.serialized_objects import BaseSerialization from airflow.utils import timezone @@ -88,20 +88,6 @@ def test_string_rfc3339_datetime_format(self, dt): """Test valid rfc3339 datetime.""" assert Param(dt, type="string", format="date-time").resolve() == dt - @pytest.mark.parametrize( - "dt", - [ - pytest.param("2022-01-02 03:04:05.678901Z", id="space-sep"), - pytest.param("2022-01-02T03:04:05.678901", id="tz-naive"), - pytest.param("2022-01-02T03Z", id="datetime-with-day-only"), - pytest.param("20161001T143028+0530", id="not-formatted-date-time"), - ], - ) - def test_string_iso8601_datetime_invalid_rfc3339_format(self, dt): - """Test valid iso8601 datetime but not valid rfc3339 datetime conversion.""" - with pytest.warns(RemovedInAirflow3Warning): - assert Param(dt, type="string", format="date-time").resolve() == dt - @pytest.mark.parametrize( "dt", [ @@ -394,19 +380,20 @@ def return_num(num): @pytest.mark.db_test @pytest.mark.parametrize( - "default, should_warn", + "default, should_raise", [ pytest.param({0, 1, 2}, True, id="default-non-JSON-serializable"), pytest.param(None, False, id="default-None"), # Param init should not warn pytest.param({"b": 1}, False, id="default-JSON-serializable"), # Param init should not warn ], ) - def test_param_json_warning(self, default, should_warn): - warning_msg = "The use of non-json-serializable params is deprecated" - cm = pytest.warns(DeprecationWarning, match=warning_msg) if should_warn else nullcontext() + def test_param_json_validation(self, default, should_raise): + exception_msg = "All provided parameters must be json-serializable" + cm = pytest.raises(ParamValidationError, match=exception_msg) if should_raise else nullcontext() with cm: p = Param(default=default) - p.resolve() # when resolved with NOTSET, should not warn. - p.resolve(value={"a": 1}) # when resolved with JSON-serializable, should not warn. - with pytest.warns(DeprecationWarning, match=warning_msg): - p.resolve(value={1, 2, 3}) # when resolved with not JSON-serializable, should warn. + if not should_raise: + p.resolve() # when resolved with NOTSET, should not warn. + p.resolve(value={"a": 1}) # when resolved with JSON-serializable, should not warn. + with pytest.raises(ParamValidationError, match=exception_msg): + p.resolve(value={1, 2, 3}) # when resolved with not JSON-serializable, should warn. diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 068e5131827de..f7aae9ce8af80 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -45,7 +45,12 @@ from airflow.datasets import Dataset from airflow.decorators import teardown from airflow.decorators.base import DecoratedOperator -from airflow.exceptions import AirflowException, RemovedInAirflow3Warning, SerializationError +from airflow.exceptions import ( + AirflowException, + ParamValidationError, + RemovedInAirflow3Warning, + SerializationError, +) from airflow.hooks.base import BaseHook from airflow.models.baseoperator import BaseOperator from airflow.models.connection import Connection @@ -888,19 +893,18 @@ def test_roundtrip_relativedelta(self, val, expected): [ (None, {}), ({"param_1": "value_1"}, {"param_1": "value_1"}), - ({"param_1": {1, 2, 3}}, {"param_1": {1, 2, 3}}), + ({"param_1": {1, 2, 3}}, ParamValidationError), ], ) def test_dag_params_roundtrip(self, val, expected_val): """ Test that params work both on Serialized DAGs & Tasks """ - if val and any([True for k, v in val.items() if isinstance(v, set)]): - with pytest.warns( - RemovedInAirflow3Warning, - match="The use of non-json-serializable params is deprecated and will be removed in a future release", - ): + if expected_val == ParamValidationError: + with pytest.raises(ParamValidationError): dag = DAG(dag_id="simple_dag", schedule=None, params=val) + # further tests not relevant + return else: dag = DAG(dag_id="simple_dag", schedule=None, params=val) BaseOperator(task_id="simple_task", dag=dag, start_date=datetime(2019, 8, 1)) @@ -979,7 +983,7 @@ def test_full_param_roundtrip(self, param: Param): [ (None, {}), ({"param_1": "value_1"}, {"param_1": "value_1"}), - ({"param_1": {1, 2, 3}}, {"param_1": {1, 2, 3}}), + ({"param_1": {1, 2, 3}}, ParamValidationError), ], ) def test_task_params_roundtrip(self, val, expected_val): @@ -987,14 +991,11 @@ def test_task_params_roundtrip(self, val, expected_val): Test that params work both on Serialized DAGs & Tasks """ dag = DAG(dag_id="simple_dag", schedule=None) - if val and any([True for k, v in val.items() if isinstance(v, set)]): - with pytest.warns( - RemovedInAirflow3Warning, - match="The use of non-json-serializable params is deprecated and will be removed in a future release", - ): + if expected_val == ParamValidationError: + with pytest.raises(ParamValidationError): BaseOperator(task_id="simple_task", dag=dag, params=val, start_date=datetime(2019, 8, 1)) - serialized_dag = SerializedDAG.to_dict(dag) - deserialized_dag = SerializedDAG.from_dict(serialized_dag) + # further tests not relevant + return else: BaseOperator(task_id="simple_task", dag=dag, params=val, start_date=datetime(2019, 8, 1)) serialized_dag = SerializedDAG.to_dict(dag)