diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 9bcb0c9176f5b..8452950e0467f 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1171,10 +1171,7 @@ def _schedule_dag_run( msg='timed_out', ) - # Send SLA & DAG Success/Failure Callbacks to be executed - self._send_dag_callbacks_to_processor(dag, callback_to_execute) - # Because we send the callback here, we need to return None - return callback + return callback_to_execute if dag_run.execution_date > timezone.utcnow() and not dag.allow_future_exec_dates: self.log.error("Execution date is in future: %s", dag_run.execution_date) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index db6df7dfeb911..27931d460aa0c 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -46,7 +46,7 @@ from airflow.jobs.base_job import BaseJob from airflow.jobs.local_task_job import LocalTaskJob from airflow.jobs.scheduler_job import SchedulerJob -from airflow.models import DAG, DagBag, DagModel, Pool, TaskInstance +from airflow.models import DAG, DagBag, DagModel, DbCallbackRequest, Pool, TaskInstance from airflow.models.dagrun import DagRun from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKey @@ -1613,7 +1613,6 @@ def test_dagrun_timeout_verify_max_active_runs(self, dag_maker): self.scheduler_job = SchedulerJob(subdir=os.devnull) self.scheduler_job.dagbag = dag_maker.dagbag - self.scheduler_job.executor = MockExecutor() session = settings.Session() orm_dag = session.query(DagModel).get(dag.dag_id) @@ -1639,7 +1638,7 @@ def test_dagrun_timeout_verify_max_active_runs(self, dag_maker): # Mock that processor_agent is started self.scheduler_job.processor_agent = mock.Mock() - self.scheduler_job._schedule_dag_run(dr, session) + callback = self.scheduler_job._schedule_dag_run(dr, session) session.flush() session.refresh(dr) @@ -1658,8 +1657,8 @@ def test_dagrun_timeout_verify_max_active_runs(self, dag_maker): msg="timed_out", ) - # Verify dag failure callback request is sent to file processor - self.scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback) + # Verify dag failure callback request is sent + assert callback == expected_callback session.rollback() session.close() @@ -1680,12 +1679,11 @@ def test_dagrun_timeout_fails_run(self, dag_maker): self.scheduler_job = SchedulerJob(subdir=os.devnull) self.scheduler_job.dagbag = dag_maker.dagbag - self.scheduler_job.executor = MockExecutor() # Mock that processor_agent is started self.scheduler_job.processor_agent = mock.Mock() - self.scheduler_job._schedule_dag_run(dr, session) + callback = self.scheduler_job._schedule_dag_run(dr, session) session.flush() session.refresh(dr) @@ -1699,8 +1697,8 @@ def test_dagrun_timeout_fails_run(self, dag_maker): msg="timed_out", ) - # Verify dag failure callback request is sent to file processor - self.scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback) + # Verify dag failure callback request is sent + assert callback == expected_callback session.rollback() session.close() @@ -1780,6 +1778,42 @@ def test_dagrun_callbacks_are_called(self, state, expected_callback_msg, dag_mak session.rollback() session.close() + def test_dagrun_timeout_callbacks_are_stored_in_database(self, dag_maker, session): + with dag_maker( + dag_id='test_dagrun_timeout_callbacks_are_stored_in_database', + on_failure_callback=lambda x: print("failed"), + dagrun_timeout=timedelta(hours=1), + ) as dag: + EmptyOperator(task_id='empty') + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.executor = MockExecutor() + self.scheduler_job.executor.callback_sink = DatabaseCallbackSink() + self.scheduler_job.dagbag = dag_maker.dagbag + self.scheduler_job.processor_agent = mock.Mock() + + dr = dag_maker.create_dagrun(start_date=DEFAULT_DATE) + + with mock.patch.object(settings, "USE_JOB_SCHEDULE", False): + self.scheduler_job._do_scheduling(session) + + callback = ( + session.query(DbCallbackRequest) + .order_by(DbCallbackRequest.id.desc()) + .first() + .get_callback_request() + ) + + expected_callback = DagCallbackRequest( + full_filepath=dag.fileloc, + dag_id=dr.dag_id, + is_failure_callback=True, + run_id=dr.run_id, + msg='timed_out', + ) + + assert callback == expected_callback + def test_dagrun_callbacks_commited_before_sent(self, dag_maker): """ Tests that before any callbacks are sent to the processor, the session is committed. This ensures