Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suppress SubDagOperator examples warnings #39057

Merged
merged 1 commit into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 43 additions & 35 deletions airflow/example_dags/example_subdag_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,52 @@

from __future__ import annotations

# [START example_subdag_operator]
import datetime

from airflow.example_dags.subdags.subdag import subdag
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator

DAG_NAME = "example_subdag_operator"

with DAG(
dag_id=DAG_NAME,
default_args={"retries": 2},
start_date=datetime.datetime(2022, 1, 1),
schedule="@once",
tags=["example"],
) as dag:
start = EmptyOperator(
task_id="start",
)
import warnings

section_1 = SubDagOperator(
task_id="section-1",
subdag=subdag(DAG_NAME, "section-1", dag.default_args),
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message=r"This class is deprecated\. Please use `airflow\.utils\.task_group\.TaskGroup`\.",
potiuk marked this conversation as resolved.
Show resolved Hide resolved
)

some_other_task = EmptyOperator(
task_id="some-other-task",
)
# [START example_subdag_operator]
import datetime

section_2 = SubDagOperator(
task_id="section-2",
subdag=subdag(DAG_NAME, "section-2", dag.default_args),
)
from airflow.example_dags.subdags.subdag import subdag
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator

end = EmptyOperator(
task_id="end",
)
DAG_NAME = "example_subdag_operator"

with DAG(
dag_id=DAG_NAME,
default_args={"retries": 2},
start_date=datetime.datetime(2022, 1, 1),
schedule="@once",
tags=["example"],
) as dag:
start = EmptyOperator(
task_id="start",
)

section_1 = SubDagOperator(
task_id="section-1",
subdag=subdag(DAG_NAME, "section-1", dag.default_args),
)

some_other_task = EmptyOperator(
task_id="some-other-task",
)

section_2 = SubDagOperator(
task_id="section-2",
subdag=subdag(DAG_NAME, "section-2", dag.default_args),
)

end = EmptyOperator(
task_id="end",
)

start >> section_1 >> some_other_task >> section_2 >> end
# [END example_subdag_operator]
start >> section_1 >> some_other_task >> section_2 >> end
# [END example_subdag_operator]
1 change: 1 addition & 0 deletions docs/apache-airflow/core-concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ This SubDAG can then be referenced in your main DAG file:

.. exampleinclude:: /../../airflow/example_dags/example_subdag_operator.py
:language: python
:dedent: 4
:start-after: [START example_subdag_operator]
:end-before: [END example_subdag_operator]

Expand Down
13 changes: 1 addition & 12 deletions tests/api_connexion/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@
# under the License.
from __future__ import annotations

import warnings

import pytest

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.www import app
from tests.test_utils.config import conf_vars
from tests.test_utils.decorators import dont_initialize_flask_app_submodules
Expand Down Expand Up @@ -59,15 +56,7 @@ def session():
def dagbag():
from airflow.models import DagBag

with warnings.catch_warnings():
# This explicitly shows off SubDagOperator, no point to warn about that.
warnings.filterwarnings(
"ignore",
category=RemovedInAirflow3Warning,
message=r".+Please use.+TaskGroup.+",
module=r".+example_subdag_operator$",
)
DagBag(include_examples=True, read_dags_from_db=False).sync_to_db()
DagBag(include_examples=True, read_dags_from_db=False).sync_to_db()
return DagBag(include_examples=True, read_dags_from_db=True)


Expand Down
18 changes: 4 additions & 14 deletions tests/api_experimental/common/test_mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from __future__ import annotations

import datetime
import warnings
from typing import Callable

import pytest
Expand All @@ -34,7 +33,6 @@
set_dag_run_state_to_success,
set_state,
)
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models import DagRun
from airflow.utils import timezone
from airflow.utils.session import create_session, provide_session
Expand All @@ -52,13 +50,9 @@
def dagbag():
from airflow.models.dagbag import DagBag

with warnings.catch_warnings():
# Some dags use deprecated operators, e.g SubDagOperator
# if it is not imported, then it might have side effects for the other tests
warnings.simplefilter("ignore", category=RemovedInAirflow3Warning)
# Ensure the DAGs we are looking at from the DB are up-to-date
non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=True)
non_serialized_dagbag.sync_to_db()
# Ensure the DAGs we are looking at from the DB are up-to-date
non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=True)
non_serialized_dagbag.sync_to_db()
return DagBag(read_dags_from_db=True)


Expand Down Expand Up @@ -484,11 +478,7 @@ class TestMarkDAGRun:

@classmethod
def setup_class(cls):
with warnings.catch_warnings():
# Some dags use deprecated operators, e.g SubDagOperator
# if it is not imported, then it might have side effects for the other tests
warnings.simplefilter("ignore", category=RemovedInAirflow3Warning)
dagbag = models.DagBag(include_examples=True, read_dags_from_db=False)
dagbag = models.DagBag(include_examples=True, read_dags_from_db=False)
cls.dag1 = dagbag.dags["miscellaneous_test_dag"]
cls.dag1.sync_to_db()
cls.dag2 = dagbag.dags["example_subdag_operator"]
Expand Down
15 changes: 1 addition & 14 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import re
import subprocess
import sys
import warnings
from contextlib import ExitStack, suppress
from datetime import datetime, timedelta, timezone
from pathlib import Path
Expand Down Expand Up @@ -321,24 +320,12 @@ def initial_db_init():
from flask import Flask

from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.utils import db
from airflow.www.extensions.init_appbuilder import init_appbuilder
from airflow.www.extensions.init_auth_manager import get_auth_manager

ignore_warnings = {
RemovedInAirflow3Warning: [
# SubDagOperator warnings
"This class is deprecated. Please use `airflow.utils.task_group.TaskGroup`."
]
}

db.resetdb()
with warnings.catch_warnings():
for warning_category, messages in ignore_warnings.items():
for message in messages:
warnings.filterwarnings("ignore", message=re.escape(message), category=warning_category)
db.bootstrap_dagbag()
db.bootstrap_dagbag()
# minimal app to add roles
flask_app = Flask(__name__)
flask_app.config["SQLALCHEMY_DATABASE_URI"] = conf.get("database", "SQL_ALCHEMY_CONN")
Expand Down
6 changes: 5 additions & 1 deletion tests/dags/test_clear_subdag.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ def create_subdag_opt(main_dag):
max_active_tasks=2,
)
BashOperator(bash_command="echo 1", task_id="daily_job_subdag_task", dag=subdag)
with warnings.catch_warnings(record=True):
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message=r"This class is deprecated\. Please use `airflow\.utils\.task_group\.TaskGroup`\.",
)
return SubDagOperator(
task_id=subdag_name,
subdag=subdag,
Expand Down
6 changes: 5 additions & 1 deletion tests/dags/test_impersonation_subdag.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ def print_today():
BashOperator(task_id="exec_bash_operator", bash_command='echo "Running within SubDag"', dag=subdag)


with warnings.catch_warnings(record=True):
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message=r"This class is deprecated\. Please use `airflow\.utils\.task_group\.TaskGroup`\.",
)
subdag_operator = SubDagOperator(
task_id="test_subdag_operation", subdag=subdag, mode="reschedule", poke_interval=1, dag=dag
)
6 changes: 5 additions & 1 deletion tests/dags/test_subdag.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ def subdag(parent_dag_name, child_dag_name, args):
task_id="start",
)

with warnings.catch_warnings(record=True):
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message=r"This class is deprecated\. Please use `airflow\.utils\.task_group\.TaskGroup`\.",
)
section_1 = SubDagOperator(
task_id="section-1",
subdag=subdag(DAG_NAME, "section-1", DEFAULT_TASK_ARGS),
Expand Down
8 changes: 1 addition & 7 deletions tests/jobs/test_backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import json
import logging
import threading
import warnings
from collections import defaultdict
from unittest import mock
from unittest.mock import patch
Expand All @@ -36,7 +35,6 @@
BackfillUnfinished,
DagConcurrencyLimitReached,
NoAvailablePoolSlot,
RemovedInAirflow3Warning,
TaskConcurrencyLimitReached,
)
from airflow.executors.executor_constants import MOCK_EXECUTOR
Expand Down Expand Up @@ -77,11 +75,7 @@

@pytest.fixture(scope="module")
def dag_bag():
with warnings.catch_warnings():
# Some dags use deprecated operators, e.g SubDagOperator
# if it is not imported, then it might have side effects for the other tests
warnings.simplefilter("ignore", category=RemovedInAirflow3Warning)
return DagBag(include_examples=True)
return DagBag(include_examples=True)


# Patch the MockExecutor into the dict of known executors in the Loader
Expand Down
11 changes: 2 additions & 9 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from __future__ import annotations

import datetime
import warnings
from functools import reduce
from typing import TYPE_CHECKING, Mapping
from unittest import mock
Expand All @@ -30,7 +29,7 @@
from airflow import settings
from airflow.callbacks.callback_requests import DagCallbackRequest
from airflow.decorators import setup, task, task_group, teardown
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.exceptions import AirflowException
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG, DagModel
from airflow.models.dagrun import DagRun, DagRunNote
Expand Down Expand Up @@ -64,13 +63,7 @@
def dagbag():
from airflow.models.dagbag import DagBag

with warnings.catch_warnings():
# Some dags use deprecated operators, e.g SubDagOperator
# if it is not imported, then it might have side effects for the other tests
warnings.simplefilter("ignore", category=RemovedInAirflow3Warning)
# Ensure the DAGs we are looking at from the DB are up-to-date
dag_bag = DagBag(include_examples=True)
return dag_bag
return DagBag(include_examples=True)


class TestDagRun:
Expand Down
9 changes: 2 additions & 7 deletions tests/www/api/experimental/test_dag_runs_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
from __future__ import annotations

import json
import warnings

import pytest

from airflow.api.common.trigger_dag import trigger_dag
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models import DagBag, DagRun
from airflow.models.serialized_dag import SerializedDagModel
from airflow.settings import Session
Expand All @@ -38,11 +36,8 @@ def _setup_session(self):
session.query(DagRun).delete()
session.commit()
session.close()
with warnings.catch_warnings():
# Some dags use deprecated operators, e.g SubDagOperator
# if it is not imported, then it might have side effects for the other tests
warnings.simplefilter("ignore", category=RemovedInAirflow3Warning)
dagbag = DagBag(include_examples=True)

dagbag = DagBag(include_examples=True)
for dag in dagbag.dags.values():
dag.sync_to_db()
SerializedDagModel.write_dag(dag)
Expand Down
12 changes: 3 additions & 9 deletions tests/www/views/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
# under the License.
from __future__ import annotations

import warnings
from contextlib import contextmanager
from typing import Any, Generator, NamedTuple

Expand All @@ -26,7 +25,6 @@
import pytest

from airflow import settings
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models import DagBag
from airflow.www.app import create_app
from tests.test_utils.api_connexion_utils import delete_user
Expand All @@ -43,13 +41,9 @@ def session():

@pytest.fixture(autouse=True, scope="module")
def examples_dag_bag(session):
with warnings.catch_warnings():
# Some dags use deprecated operators, e.g SubDagOperator
# if it is not imported, then it might have side effects for the other tests
warnings.simplefilter("ignore", category=RemovedInAirflow3Warning)
DagBag(include_examples=True).sync_to_db()
dag_bag = DagBag(include_examples=True, read_dags_from_db=True)
session.commit()
DagBag(include_examples=True).sync_to_db()
dag_bag = DagBag(include_examples=True, read_dags_from_db=True)
session.commit()
return dag_bag


Expand Down