Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecations in airflow.models.param #41776

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions airflow/models/dagparam.py

This file was deleted.

49 changes: 7 additions & 42 deletions airflow/models/param.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,11 @@

import contextlib
import copy
import datetime
import json
import logging
import warnings
from typing import TYPE_CHECKING, Any, ClassVar, ItemsView, Iterable, MutableMapping, ValuesView

from pendulum.parsing import parse_iso8601

from airflow.exceptions import AirflowException, ParamValidationError, RemovedInAirflow3Warning
from airflow.utils import timezone
from airflow.exceptions import AirflowException, ParamValidationError
from airflow.utils.mixins import ResolveMixin
from airflow.utils.types import NOTSET, ArgNotSet

Expand Down Expand Up @@ -59,7 +54,7 @@ class Param:

def __init__(self, default: Any = NOTSET, description: str | None = None, **kwargs):
if default is not NOTSET:
self._warn_if_not_json(default)
self._check_json(default)
self.value = default
self.description = description
self.schema = kwargs.pop("schema") if "schema" in kwargs else kwargs
Expand All @@ -68,39 +63,14 @@ def __copy__(self) -> Param:
return Param(self.value, self.description, schema=self.schema)

@staticmethod
def _warn_if_not_json(value):
def _check_json(value):
try:
json.dumps(value)
except Exception:
warnings.warn(
"The use of non-json-serializable params is deprecated and will be removed in "
"a future release",
RemovedInAirflow3Warning,
stacklevel=1,
)

@staticmethod
def _warn_if_not_rfc3339_dt(value):
"""Fallback to iso8601 datetime validation if rfc3339 failed."""
try:
iso8601_value = parse_iso8601(value)
except Exception:
return None
if not isinstance(iso8601_value, datetime.datetime):
return None
warnings.warn(
f"The use of non-RFC3339 datetime: {value!r} is deprecated "
"and will be removed in a future release",
RemovedInAirflow3Warning,
stacklevel=1,
)
if timezone.is_naive(iso8601_value):
warnings.warn(
"The use naive datetime is deprecated and will be removed in a future release",
RemovedInAirflow3Warning,
stacklevel=1,
raise ParamValidationError(
"All provided parameters must be json-serializable. "
f"The value '{value}' is not serializable."
)
return value

def resolve(self, value: Any = NOTSET, suppress_exception: bool = False) -> Any:
"""
Expand All @@ -120,7 +90,7 @@ def resolve(self, value: Any = NOTSET, suppress_exception: bool = False) -> Any:
from jsonschema.exceptions import ValidationError

if value is not NOTSET:
self._warn_if_not_json(value)
self._check_json(value)
final_val = self.value if value is NOTSET else value
if isinstance(final_val, ArgNotSet):
if suppress_exception:
Expand All @@ -129,11 +99,6 @@ def resolve(self, value: Any = NOTSET, suppress_exception: bool = False) -> Any:
try:
jsonschema.validate(final_val, self.schema, format_checker=FormatChecker())
except ValidationError as err:
if err.schema.get("format") == "date-time":
rfc3339_value = self._warn_if_not_rfc3339_dt(final_val)
if rfc3339_value:
self.value = rfc3339_value
return rfc3339_value
if suppress_exception:
return None
raise ParamValidationError(err) from None
Expand Down
5 changes: 5 additions & 0 deletions newsfragments/41776.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Removed a set of deprecations in from ``airflow.models.param``.

- Removed deprecated direct access to DagParam as module. Please import from ``airflow.models.param``.
- Ensure all param values are JSON serialiazable and raise a ``ParamValidationError`` if not.
- Ensure parsed date and time values are RFC3339 compliant.
33 changes: 10 additions & 23 deletions tests/models/test_param.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import pytest

from airflow.decorators import task
from airflow.exceptions import ParamValidationError, RemovedInAirflow3Warning
from airflow.exceptions import ParamValidationError
from airflow.models.param import Param, ParamsDict
from airflow.serialization.serialized_objects import BaseSerialization
from airflow.utils import timezone
Expand Down Expand Up @@ -88,20 +88,6 @@ def test_string_rfc3339_datetime_format(self, dt):
"""Test valid rfc3339 datetime."""
assert Param(dt, type="string", format="date-time").resolve() == dt

@pytest.mark.parametrize(
"dt",
[
pytest.param("2022-01-02 03:04:05.678901Z", id="space-sep"),
pytest.param("2022-01-02T03:04:05.678901", id="tz-naive"),
pytest.param("2022-01-02T03Z", id="datetime-with-day-only"),
pytest.param("20161001T143028+0530", id="not-formatted-date-time"),
],
)
def test_string_iso8601_datetime_invalid_rfc3339_format(self, dt):
"""Test valid iso8601 datetime but not valid rfc3339 datetime conversion."""
with pytest.warns(RemovedInAirflow3Warning):
assert Param(dt, type="string", format="date-time").resolve() == dt

@pytest.mark.parametrize(
"dt",
[
Expand Down Expand Up @@ -394,19 +380,20 @@ def return_num(num):

@pytest.mark.db_test
@pytest.mark.parametrize(
"default, should_warn",
"default, should_raise",
[
pytest.param({0, 1, 2}, True, id="default-non-JSON-serializable"),
pytest.param(None, False, id="default-None"), # Param init should not warn
pytest.param({"b": 1}, False, id="default-JSON-serializable"), # Param init should not warn
],
)
def test_param_json_warning(self, default, should_warn):
warning_msg = "The use of non-json-serializable params is deprecated"
cm = pytest.warns(DeprecationWarning, match=warning_msg) if should_warn else nullcontext()
def test_param_json_validation(self, default, should_raise):
exception_msg = "All provided parameters must be json-serializable"
cm = pytest.raises(ParamValidationError, match=exception_msg) if should_raise else nullcontext()
with cm:
p = Param(default=default)
p.resolve() # when resolved with NOTSET, should not warn.
p.resolve(value={"a": 1}) # when resolved with JSON-serializable, should not warn.
with pytest.warns(DeprecationWarning, match=warning_msg):
p.resolve(value={1, 2, 3}) # when resolved with not JSON-serializable, should warn.
if not should_raise:
p.resolve() # when resolved with NOTSET, should not warn.
p.resolve(value={"a": 1}) # when resolved with JSON-serializable, should not warn.
with pytest.raises(ParamValidationError, match=exception_msg):
p.resolve(value={1, 2, 3}) # when resolved with not JSON-serializable, should warn.
31 changes: 16 additions & 15 deletions tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@
from airflow.datasets import Dataset
from airflow.decorators import teardown
from airflow.decorators.base import DecoratedOperator
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning, SerializationError
from airflow.exceptions import (
AirflowException,
ParamValidationError,
RemovedInAirflow3Warning,
SerializationError,
)
from airflow.hooks.base import BaseHook
from airflow.models.baseoperator import BaseOperator
from airflow.models.connection import Connection
Expand Down Expand Up @@ -888,19 +893,18 @@ def test_roundtrip_relativedelta(self, val, expected):
[
(None, {}),
({"param_1": "value_1"}, {"param_1": "value_1"}),
({"param_1": {1, 2, 3}}, {"param_1": {1, 2, 3}}),
({"param_1": {1, 2, 3}}, ParamValidationError),
],
)
def test_dag_params_roundtrip(self, val, expected_val):
"""
Test that params work both on Serialized DAGs & Tasks
"""
if val and any([True for k, v in val.items() if isinstance(v, set)]):
with pytest.warns(
RemovedInAirflow3Warning,
match="The use of non-json-serializable params is deprecated and will be removed in a future release",
):
if expected_val == ParamValidationError:
with pytest.raises(ParamValidationError):
dag = DAG(dag_id="simple_dag", schedule=None, params=val)
# further tests not relevant
return
else:
dag = DAG(dag_id="simple_dag", schedule=None, params=val)
BaseOperator(task_id="simple_task", dag=dag, start_date=datetime(2019, 8, 1))
Expand Down Expand Up @@ -979,22 +983,19 @@ def test_full_param_roundtrip(self, param: Param):
[
(None, {}),
({"param_1": "value_1"}, {"param_1": "value_1"}),
({"param_1": {1, 2, 3}}, {"param_1": {1, 2, 3}}),
({"param_1": {1, 2, 3}}, ParamValidationError),
],
)
def test_task_params_roundtrip(self, val, expected_val):
"""
Test that params work both on Serialized DAGs & Tasks
"""
dag = DAG(dag_id="simple_dag", schedule=None)
if val and any([True for k, v in val.items() if isinstance(v, set)]):
with pytest.warns(
RemovedInAirflow3Warning,
match="The use of non-json-serializable params is deprecated and will be removed in a future release",
):
if expected_val == ParamValidationError:
with pytest.raises(ParamValidationError):
BaseOperator(task_id="simple_task", dag=dag, params=val, start_date=datetime(2019, 8, 1))
serialized_dag = SerializedDAG.to_dict(dag)
deserialized_dag = SerializedDAG.from_dict(serialized_dag)
# further tests not relevant
return
else:
BaseOperator(task_id="simple_task", dag=dag, params=val, start_date=datetime(2019, 8, 1))
serialized_dag = SerializedDAG.to_dict(dag)
Expand Down