Skip to content

Commit

Permalink
Send DAG timeout callbacks to processor outside of prohibit_commit (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tanelk authored Jul 6, 2022
1 parent 8053876 commit 438d13e
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 13 deletions.
5 changes: 1 addition & 4 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1171,10 +1171,7 @@ def _schedule_dag_run(
msg='timed_out',
)

# Send SLA & DAG Success/Failure Callbacks to be executed
self._send_dag_callbacks_to_processor(dag, callback_to_execute)
# Because we send the callback here, we need to return None
return callback
return callback_to_execute

if dag_run.execution_date > timezone.utcnow() and not dag.allow_future_exec_dates:
self.log.error("Execution date is in future: %s", dag_run.execution_date)
Expand Down
52 changes: 43 additions & 9 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from airflow.jobs.base_job import BaseJob
from airflow.jobs.local_task_job import LocalTaskJob
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.models import DAG, DagBag, DagModel, Pool, TaskInstance
from airflow.models import DAG, DagBag, DagModel, DbCallbackRequest, Pool, TaskInstance
from airflow.models.dagrun import DagRun
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKey
Expand Down Expand Up @@ -1613,7 +1613,6 @@ def test_dagrun_timeout_verify_max_active_runs(self, dag_maker):

self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.dagbag = dag_maker.dagbag
self.scheduler_job.executor = MockExecutor()

session = settings.Session()
orm_dag = session.query(DagModel).get(dag.dag_id)
Expand All @@ -1639,7 +1638,7 @@ def test_dagrun_timeout_verify_max_active_runs(self, dag_maker):
# Mock that processor_agent is started
self.scheduler_job.processor_agent = mock.Mock()

self.scheduler_job._schedule_dag_run(dr, session)
callback = self.scheduler_job._schedule_dag_run(dr, session)
session.flush()

session.refresh(dr)
Expand All @@ -1658,8 +1657,8 @@ def test_dagrun_timeout_verify_max_active_runs(self, dag_maker):
msg="timed_out",
)

# Verify dag failure callback request is sent to file processor
self.scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback)
# Verify dag failure callback request is sent
assert callback == expected_callback

session.rollback()
session.close()
Expand All @@ -1680,12 +1679,11 @@ def test_dagrun_timeout_fails_run(self, dag_maker):

self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.dagbag = dag_maker.dagbag
self.scheduler_job.executor = MockExecutor()

# Mock that processor_agent is started
self.scheduler_job.processor_agent = mock.Mock()

self.scheduler_job._schedule_dag_run(dr, session)
callback = self.scheduler_job._schedule_dag_run(dr, session)
session.flush()

session.refresh(dr)
Expand All @@ -1699,8 +1697,8 @@ def test_dagrun_timeout_fails_run(self, dag_maker):
msg="timed_out",
)

# Verify dag failure callback request is sent to file processor
self.scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback)
# Verify dag failure callback request is sent
assert callback == expected_callback

session.rollback()
session.close()
Expand Down Expand Up @@ -1780,6 +1778,42 @@ def test_dagrun_callbacks_are_called(self, state, expected_callback_msg, dag_mak
session.rollback()
session.close()

def test_dagrun_timeout_callbacks_are_stored_in_database(self, dag_maker, session):
with dag_maker(
dag_id='test_dagrun_timeout_callbacks_are_stored_in_database',
on_failure_callback=lambda x: print("failed"),
dagrun_timeout=timedelta(hours=1),
) as dag:
EmptyOperator(task_id='empty')

self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.executor = MockExecutor()
self.scheduler_job.executor.callback_sink = DatabaseCallbackSink()
self.scheduler_job.dagbag = dag_maker.dagbag
self.scheduler_job.processor_agent = mock.Mock()

dr = dag_maker.create_dagrun(start_date=DEFAULT_DATE)

with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
self.scheduler_job._do_scheduling(session)

callback = (
session.query(DbCallbackRequest)
.order_by(DbCallbackRequest.id.desc())
.first()
.get_callback_request()
)

expected_callback = DagCallbackRequest(
full_filepath=dag.fileloc,
dag_id=dr.dag_id,
is_failure_callback=True,
run_id=dr.run_id,
msg='timed_out',
)

assert callback == expected_callback

def test_dagrun_callbacks_commited_before_sent(self, dag_maker):
"""
Tests that before any callbacks are sent to the processor, the session is committed. This ensures
Expand Down

0 comments on commit 438d13e

Please sign in to comment.