From 0f12806874aef406edd8c0b7fcabff0f474c0743 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Thu, 20 Oct 2022 21:19:50 +0100 Subject: [PATCH] Listener: Set task on sqlalchemy taskinstance object (#27167) same as https://github.com/apache/airflow/pull/21157 TaskListener API's contract promises to pass TaskInstance object to listener plugin. However, what happens is not 100% true - the object being passed is one that maps to current SQLAlchemy session. `_run_raw_task` before merging the TI operates on detached TaskInstance object, then merges it to current session. Since there is no attached object in the SQLAlchemy identity map, SQLAlchemy creates it, and it's this object that's being passed to the SQLAlchemy event listeners. The problem with that is that when creating new SQLAlchemy object, SQLAlchemy takes care about setting only database-mapped fields. The ones that are purely on the python side, like task aren't being set on the new object. This PR manually sets `task` on the new SQLAlchemy object, so that `on_task_instance_success` receives proper TaskInstance with task field set. GitOrigin-RevId: 395ad7110e53a30a5d33f648d1dd797482eb268c --- airflow/models/taskinstance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 30fb9b5eb75..4a378ddb18f 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1522,7 +1522,7 @@ def _run_raw_task( if not test_mode: session.add(Log(self.state, self)) - session.merge(self) + session.merge(self).task = self.task if self.state == TaskInstanceState.SUCCESS: self._register_dataset_changes(session=session) session.commit()