Skip to content

Commit

Permalink
Move TriggerDagRunOperator to standard provider (apache#44053)
Browse files Browse the repository at this point in the history
* Moved trigger_dagrun operator to standard provider

* Changed all imports for dagrun_operator from operators to standard provider.

* Changed all imports for dagrun_operator from operators to standard provider.

* Update trigger_dagrun doc reference

* Revert Changes

* Add trigger_dagrun to standard provider.yaml file.

* Change BaseOperator[Link] import path.
  • Loading branch information
hardeybisey authored and got686-yandex committed Jan 30, 2025
1 parent 51ee52c commit 6e4f571
Show file tree
Hide file tree
Showing 9 changed files with 13 additions and 11 deletions.
2 changes: 1 addition & 1 deletion airflow/example_dags/example_trigger_controller_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import pendulum

from airflow.models.dag import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator

with DAG(
dag_id="example_trigger_controller_dag",
Expand Down
4 changes: 2 additions & 2 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
log = logging.getLogger(__name__)

_OPERATOR_EXTRA_LINKS: set[str] = {
"airflow.operators.trigger_dagrun.TriggerDagRunLink",
"airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunLink",
"airflow.sensors.external_task.ExternalDagLink",
# Deprecated names, so that existing serialized dags load straight away.
"airflow.sensors.external_task.ExternalTaskSensorLink",
Expand Down Expand Up @@ -1020,7 +1020,7 @@ class DependencyDetector:
@staticmethod
def detect_task_dependencies(task: Operator) -> list[DagDependency]:
"""Detect dependencies caused by tasks."""
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor

deps = []
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/core-concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ While dependencies between tasks in a DAG are explicitly defined through upstrea
relationships, dependencies between DAGs are a bit more complex. In general, there are two ways
in which one DAG can depend on another:

- triggering - :class:`~airflow.operators.trigger_dagrun.TriggerDagRunOperator`
- triggering - :class:`~airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunOperator`
- waiting - :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor`

Additional difficulty is that one DAG could wait for or trigger several runs of the other DAG
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`.
* - :mod:`airflow.operators.latest_only`
-

* - :mod:`airflow.operators.trigger_dagrun`
* - :mod:`airflow.providers.standard.operators.trigger_dagrun`
-

**Sensors:**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
DagNotFound,
DagRunAlreadyExists,
)
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models import BaseOperator, BaseOperatorLink
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
Expand Down
1 change: 1 addition & 0 deletions providers/src/airflow/providers/standard/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ operators:
- airflow.providers.standard.operators.bash
- airflow.providers.standard.operators.python
- airflow.providers.standard.operators.generic_transfer
- airflow.providers.standard.operators.trigger_dagrun

sensors:
- integration-name: Standard
Expand Down
6 changes: 4 additions & 2 deletions tests/operators/test_trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from airflow.models.dagrun import DagRun
from airflow.models.log import Log
from airflow.models.taskinstance import TaskInstance
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.settings import TracebackSessionForTests
from airflow.triggers.external_task import DagStateTrigger
from airflow.utils import timezone
Expand Down Expand Up @@ -107,7 +107,9 @@ def assert_extra_link(self, triggered_dag_run, triggering_task, session):
)
.one()
)
with mock.patch("airflow.operators.trigger_dagrun.build_airflow_url_with_query") as mock_build_url:
with mock.patch(
"airflow.providers.standard.operators.trigger_dagrun.build_airflow_url_with_query"
) as mock_build_url:
triggering_task.get_extra_links(triggering_ti, "Triggered DAG")
assert mock_build_url.called
args, _ = mock_build_url.call_args
Expand Down
2 changes: 1 addition & 1 deletion tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,7 @@ def test_derived_dag_deps_operator(self):
Tests DAG dependency detection for operators, including derived classes
"""
from airflow.operators.empty import EmptyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator

class DerivedOperator(TriggerDagRunOperator):
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.timezone import datetime

Expand Down

0 comments on commit 6e4f571

Please sign in to comment.