From c7149c002b14606b3118e97eb676c3e4b6ff98b4 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Mon, 13 Mar 2023 10:32:26 -0500 Subject: [PATCH] Serialize _is_setup and _is_teardown on tasks (#30009) We should make sure these attrs are in the serialized DAG so the scheduler knows if the tasks are setup or teardown tasks. --- airflow/models/baseoperator.py | 5 ++++ tests/serialization/test_dag_serialization.py | 27 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 0c5019bd58aa6..21ee60d93c3b5 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -686,6 +686,9 @@ class derived from this one results in the creation of a task object, # Set to True for an operator instantiated by a mapped operator. __from_mapped = False + _is_setup = False + _is_teardown = False + def __init__( self, task_id: str, @@ -1472,6 +1475,8 @@ def get_serialized_fields(cls): "template_fields", "template_fields_renderers", "params", + "_is_setup", + "_is_teardown", } ) DagContext.pop_context_managed_dag() diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index dbb8f1d8acc59..6e620b2aca7df 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -159,6 +159,8 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i "_task_type": "BashOperator", "_task_module": "airflow.operators.bash", "pool": "default_pool", + "_is_setup": False, + "_is_teardown": False, "executor_config": { "__type": "dict", "__var": { @@ -188,6 +190,8 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i "_operator_name": "@custom", "_task_module": "tests.test_utils.mock_operators", "pool": "default_pool", + "_is_setup": False, + "_is_teardown": False, }, ], "schedule_interval": {"__type": "timedelta", "__var": 86400.0}, @@ -1304,6 +1308,11 @@ def check_task_group(node): def assert_taskgroup_children(se_task_group, dag_task_group, expected_children): assert se_task_group.children.keys() == dag_task_group.children.keys() == expected_children + @staticmethod + def assert_task_is_setup_teardown(task, is_setup: bool = False, is_teardown: bool = False): + assert task._is_setup == is_setup + assert task._is_teardown == is_teardown + def test_task_group_setup_teardown_tasks(self): """ Test TaskGroup setup and teardown task serialization/deserialization. @@ -1335,6 +1344,8 @@ def test_task_group_setup_teardown_tasks(self): self.assert_taskgroup_children( serialized_dag.task_group, dag.task_group, {"setup", "teardown", "group1"} ) + self.assert_task_is_setup_teardown(serialized_dag.task_group.children["setup"], is_setup=True) + self.assert_task_is_setup_teardown(serialized_dag.task_group.children["teardown"], is_teardown=True) se_first_group = serialized_dag.task_group.children["group1"] dag_first_group = dag.task_group.children["group1"] @@ -1343,6 +1354,9 @@ def test_task_group_setup_teardown_tasks(self): dag_first_group, {"group1.setup1", "group1.task1", "group1.group2", "group1.teardown1"}, ) + self.assert_task_is_setup_teardown(se_first_group.children["group1.setup1"], is_setup=True) + self.assert_task_is_setup_teardown(se_first_group.children["group1.task1"]) + self.assert_task_is_setup_teardown(se_first_group.children["group1.teardown1"], is_teardown=True) se_second_group = se_first_group.children["group1.group2"] dag_second_group = dag_first_group.children["group1.group2"] @@ -1351,6 +1365,11 @@ def test_task_group_setup_teardown_tasks(self): dag_second_group, {"group1.group2.setup2", "group1.group2.task2", "group1.group2.teardown2"}, ) + self.assert_task_is_setup_teardown(se_second_group.children["group1.group2.setup2"], is_setup=True) + self.assert_task_is_setup_teardown(se_second_group.children["group1.group2.task2"]) + self.assert_task_is_setup_teardown( + se_second_group.children["group1.group2.teardown2"], is_teardown=True + ) def test_task_group_setup_teardown_taskgroups(self): """ @@ -1391,22 +1410,30 @@ def teardown_group(): self.assert_taskgroup_children( serialized_dag.task_group, dag.task_group, {"setup_group", "sometask", "teardown_group"} ) + self.assert_task_is_setup_teardown(serialized_dag.task_group.children["sometask"]) se_setup_group = serialized_dag.task_group.children["setup_group"] dag_setup_group = dag.task_group.children["setup_group"] self.assert_taskgroup_children( se_setup_group, dag_setup_group, {"setup_group.setup1", "setup_group.sub_setup"} ) + self.assert_task_is_setup_teardown(se_setup_group.children["setup_group.setup1"], is_setup=True) se_sub_setup_group = se_setup_group.children["setup_group.sub_setup"] dag_sub_setup_group = dag_setup_group.children["setup_group.sub_setup"] self.assert_taskgroup_children( se_sub_setup_group, dag_sub_setup_group, {"setup_group.sub_setup.setup2"} ) + self.assert_task_is_setup_teardown( + se_sub_setup_group.children["setup_group.sub_setup.setup2"], is_setup=True + ) se_teardown_group = serialized_dag.task_group.children["teardown_group"] dag_teardown_group = dag.task_group.children["teardown_group"] self.assert_taskgroup_children(se_teardown_group, dag_teardown_group, {"teardown_group.teardown1"}) + self.assert_task_is_setup_teardown( + se_teardown_group.children["teardown_group.teardown1"], is_teardown=True + ) def test_deps_sorted(self): """