Skip to content

Commit

Permalink
Suppress SubDagOperator examples warnings (#39057)
Browse files Browse the repository at this point in the history
  • Loading branch information
Taragolis authored Apr 18, 2024
1 parent d1ad347 commit 70fc0c5
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 110 deletions.
78 changes: 43 additions & 35 deletions airflow/example_dags/example_subdag_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,52 @@

from __future__ import annotations

# [START example_subdag_operator]
import datetime

from airflow.example_dags.subdags.subdag import subdag
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator

DAG_NAME = "example_subdag_operator"

with DAG(
dag_id=DAG_NAME,
default_args={"retries": 2},
start_date=datetime.datetime(2022, 1, 1),
schedule="@once",
tags=["example"],
) as dag:
start = EmptyOperator(
task_id="start",
)
import warnings

section_1 = SubDagOperator(
task_id="section-1",
subdag=subdag(DAG_NAME, "section-1", dag.default_args),
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message=r"This class is deprecated\. Please use `airflow\.utils\.task_group\.TaskGroup`\.",
)

some_other_task = EmptyOperator(
task_id="some-other-task",
)
# [START example_subdag_operator]
import datetime

section_2 = SubDagOperator(
task_id="section-2",
subdag=subdag(DAG_NAME, "section-2", dag.default_args),
)
from airflow.example_dags.subdags.subdag import subdag
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator

end = EmptyOperator(
task_id="end",
)
DAG_NAME = "example_subdag_operator"

with DAG(
dag_id=DAG_NAME,
default_args={"retries": 2},
start_date=datetime.datetime(2022, 1, 1),
schedule="@once",
tags=["example"],
) as dag:
start = EmptyOperator(
task_id="start",
)

section_1 = SubDagOperator(
task_id="section-1",
subdag=subdag(DAG_NAME, "section-1", dag.default_args),
)

some_other_task = EmptyOperator(
task_id="some-other-task",
)

section_2 = SubDagOperator(
task_id="section-2",
subdag=subdag(DAG_NAME, "section-2", dag.default_args),
)

end = EmptyOperator(
task_id="end",
)

start >> section_1 >> some_other_task >> section_2 >> end
# [END example_subdag_operator]
start >> section_1 >> some_other_task >> section_2 >> end
# [END example_subdag_operator]
1 change: 1 addition & 0 deletions docs/apache-airflow/core-concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ This SubDAG can then be referenced in your main DAG file:

.. exampleinclude:: /../../airflow/example_dags/example_subdag_operator.py
:language: python
:dedent: 4
:start-after: [START example_subdag_operator]
:end-before: [END example_subdag_operator]

Expand Down
13 changes: 1 addition & 12 deletions tests/api_connexion/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@
# under the License.
from __future__ import annotations

import warnings

import pytest

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.www import app
from tests.test_utils.config import conf_vars
from tests.test_utils.decorators import dont_initialize_flask_app_submodules
Expand Down Expand Up @@ -59,15 +56,7 @@ def session():
def dagbag():
from airflow.models import DagBag

with warnings.catch_warnings():
# This explicitly shows off SubDagOperator, no point to warn about that.
warnings.filterwarnings(
"ignore",
category=RemovedInAirflow3Warning,
message=r".+Please use.+TaskGroup.+",
module=r".+example_subdag_operator$",
)
DagBag(include_examples=True, read_dags_from_db=False).sync_to_db()
DagBag(include_examples=True, read_dags_from_db=False).sync_to_db()
return DagBag(include_examples=True, read_dags_from_db=True)


Expand Down
18 changes: 4 additions & 14 deletions tests/api_experimental/common/test_mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from __future__ import annotations

import datetime
import warnings
from typing import Callable

import pytest
Expand All @@ -34,7 +33,6 @@
set_dag_run_state_to_success,
set_state,
)
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models import DagRun
from airflow.utils import timezone
from airflow.utils.session import create_session, provide_session
Expand All @@ -52,13 +50,9 @@
def dagbag():
from airflow.models.dagbag import DagBag

with warnings.catch_warnings():
# Some dags use deprecated operators, e.g SubDagOperator
# if it is not imported, then it might have side effects for the other tests
warnings.simplefilter("ignore", category=RemovedInAirflow3Warning)
# Ensure the DAGs we are looking at from the DB are up-to-date
non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=True)
non_serialized_dagbag.sync_to_db()
# Ensure the DAGs we are looking at from the DB are up-to-date
non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=True)
non_serialized_dagbag.sync_to_db()
return DagBag(read_dags_from_db=True)


Expand Down Expand Up @@ -484,11 +478,7 @@ class TestMarkDAGRun:

@classmethod
def setup_class(cls):
with warnings.catch_warnings():
# Some dags use deprecated operators, e.g SubDagOperator
# if it is not imported, then it might have side effects for the other tests
warnings.simplefilter("ignore", category=RemovedInAirflow3Warning)
dagbag = models.DagBag(include_examples=True, read_dags_from_db=False)
dagbag = models.DagBag(include_examples=True, read_dags_from_db=False)
cls.dag1 = dagbag.dags["miscellaneous_test_dag"]
cls.dag1.sync_to_db()
cls.dag2 = dagbag.dags["example_subdag_operator"]
Expand Down
15 changes: 1 addition & 14 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import re
import subprocess
import sys
import warnings
from contextlib import ExitStack, suppress
from datetime import datetime, timedelta, timezone
from pathlib import Path
Expand Down Expand Up @@ -321,24 +320,12 @@ def initial_db_init():
from flask import Flask

from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.utils import db
from airflow.www.extensions.init_appbuilder import init_appbuilder
from airflow.www.extensions.init_auth_manager import get_auth_manager

ignore_warnings = {
RemovedInAirflow3Warning: [
# SubDagOperator warnings
"This class is deprecated. Please use `airflow.utils.task_group.TaskGroup`."
]
}

db.resetdb()
with warnings.catch_warnings():
for warning_category, messages in ignore_warnings.items():
for message in messages:
warnings.filterwarnings("ignore", message=re.escape(message), category=warning_category)
db.bootstrap_dagbag()
db.bootstrap_dagbag()
# minimal app to add roles
flask_app = Flask(__name__)
flask_app.config["SQLALCHEMY_DATABASE_URI"] = conf.get("database", "SQL_ALCHEMY_CONN")
Expand Down
6 changes: 5 additions & 1 deletion tests/dags/test_clear_subdag.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ def create_subdag_opt(main_dag):
max_active_tasks=2,
)
BashOperator(bash_command="echo 1", task_id="daily_job_subdag_task", dag=subdag)
with warnings.catch_warnings(record=True):
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message=r"This class is deprecated\. Please use `airflow\.utils\.task_group\.TaskGroup`\.",
)
return SubDagOperator(
task_id=subdag_name,
subdag=subdag,
Expand Down
6 changes: 5 additions & 1 deletion tests/dags/test_impersonation_subdag.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ def print_today():
BashOperator(task_id="exec_bash_operator", bash_command='echo "Running within SubDag"', dag=subdag)


with warnings.catch_warnings(record=True):
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message=r"This class is deprecated\. Please use `airflow\.utils\.task_group\.TaskGroup`\.",
)
subdag_operator = SubDagOperator(
task_id="test_subdag_operation", subdag=subdag, mode="reschedule", poke_interval=1, dag=dag
)
6 changes: 5 additions & 1 deletion tests/dags/test_subdag.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ def subdag(parent_dag_name, child_dag_name, args):
task_id="start",
)

with warnings.catch_warnings(record=True):
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message=r"This class is deprecated\. Please use `airflow\.utils\.task_group\.TaskGroup`\.",
)
section_1 = SubDagOperator(
task_id="section-1",
subdag=subdag(DAG_NAME, "section-1", DEFAULT_TASK_ARGS),
Expand Down
8 changes: 1 addition & 7 deletions tests/jobs/test_backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import json
import logging
import threading
import warnings
from collections import defaultdict
from unittest import mock
from unittest.mock import patch
Expand All @@ -36,7 +35,6 @@
BackfillUnfinished,
DagConcurrencyLimitReached,
NoAvailablePoolSlot,
RemovedInAirflow3Warning,
TaskConcurrencyLimitReached,
)
from airflow.executors.executor_constants import MOCK_EXECUTOR
Expand Down Expand Up @@ -77,11 +75,7 @@

@pytest.fixture(scope="module")
def dag_bag():
with warnings.catch_warnings():
# Some dags use deprecated operators, e.g SubDagOperator
# if it is not imported, then it might have side effects for the other tests
warnings.simplefilter("ignore", category=RemovedInAirflow3Warning)
return DagBag(include_examples=True)
return DagBag(include_examples=True)


# Patch the MockExecutor into the dict of known executors in the Loader
Expand Down
11 changes: 2 additions & 9 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from __future__ import annotations

import datetime
import warnings
from functools import reduce
from typing import TYPE_CHECKING, Mapping
from unittest import mock
Expand All @@ -30,7 +29,7 @@
from airflow import settings
from airflow.callbacks.callback_requests import DagCallbackRequest
from airflow.decorators import setup, task, task_group, teardown
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.exceptions import AirflowException
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG, DagModel
from airflow.models.dagrun import DagRun, DagRunNote
Expand Down Expand Up @@ -64,13 +63,7 @@
def dagbag():
from airflow.models.dagbag import DagBag

with warnings.catch_warnings():
# Some dags use deprecated operators, e.g SubDagOperator
# if it is not imported, then it might have side effects for the other tests
warnings.simplefilter("ignore", category=RemovedInAirflow3Warning)
# Ensure the DAGs we are looking at from the DB are up-to-date
dag_bag = DagBag(include_examples=True)
return dag_bag
return DagBag(include_examples=True)


class TestDagRun:
Expand Down
9 changes: 2 additions & 7 deletions tests/www/api/experimental/test_dag_runs_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
from __future__ import annotations

import json
import warnings

import pytest

from airflow.api.common.trigger_dag import trigger_dag
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models import DagBag, DagRun
from airflow.models.serialized_dag import SerializedDagModel
from airflow.settings import Session
Expand All @@ -38,11 +36,8 @@ def _setup_session(self):
session.query(DagRun).delete()
session.commit()
session.close()
with warnings.catch_warnings():
# Some dags use deprecated operators, e.g SubDagOperator
# if it is not imported, then it might have side effects for the other tests
warnings.simplefilter("ignore", category=RemovedInAirflow3Warning)
dagbag = DagBag(include_examples=True)

dagbag = DagBag(include_examples=True)
for dag in dagbag.dags.values():
dag.sync_to_db()
SerializedDagModel.write_dag(dag)
Expand Down
12 changes: 3 additions & 9 deletions tests/www/views/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
# under the License.
from __future__ import annotations

import warnings
from contextlib import contextmanager
from typing import Any, Generator, NamedTuple

Expand All @@ -26,7 +25,6 @@
import pytest

from airflow import settings
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models import DagBag
from airflow.www.app import create_app
from tests.test_utils.api_connexion_utils import delete_user
Expand All @@ -43,13 +41,9 @@ def session():

@pytest.fixture(autouse=True, scope="module")
def examples_dag_bag(session):
with warnings.catch_warnings():
# Some dags use deprecated operators, e.g SubDagOperator
# if it is not imported, then it might have side effects for the other tests
warnings.simplefilter("ignore", category=RemovedInAirflow3Warning)
DagBag(include_examples=True).sync_to_db()
dag_bag = DagBag(include_examples=True, read_dags_from_db=True)
session.commit()
DagBag(include_examples=True).sync_to_db()
dag_bag = DagBag(include_examples=True, read_dags_from_db=True)
session.commit()
return dag_bag


Expand Down

0 comments on commit 70fc0c5

Please sign in to comment.