Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listener: Set task on sqlalchemy taskinstance object #27167

Merged
merged 1 commit into from
Oct 20, 2022

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented Oct 20, 2022

same as #21157

TaskListener API's contract promises to pass TaskInstance object to listener plugin. However, what happens is not 100% true - the object being passed is one that maps to current SQLAlchemy session.

_run_raw_task before merging the TI operates on detached TaskInstance object, then merges it to current session. Since there is no attached object in the SQLAlchemy identity map, SQLAlchemy creates it, and it's this object that's being passed to the SQLAlchemy event listeners.

The problem with that is that when creating new SQLAlchemy object, SQLAlchemy takes care about setting only database-mapped fields. The ones that are purely on the python side, like task aren't being set on the new object.

This PR manually sets task on the new SQLAlchemy object, so that on_task_instance_success and on_task_instance_failed receives proper TaskInstance with task field set.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@kaxil kaxil requested review from XD-DENG and ashb as code owners October 20, 2022 18:47
@kaxil kaxil requested a review from dstandish October 20, 2022 18:47
same as apache#21157

TaskListener API's contract promises to pass TaskInstance object to listener plugin. However, what happens is not 100% true - the object being passed is one that maps to current SQLAlchemy session.

`_run_raw_task` before merging the TI operates on detached TaskInstance object, then merges it to current session. Since there is no attached object in the SQLAlchemy identity map, SQLAlchemy creates it, and it's this object that's being passed to the SQLAlchemy event listeners.

The problem with that is that when creating new SQLAlchemy object, SQLAlchemy takes care about setting only database-mapped fields. The ones that are purely on the python side, like task aren't being set on the new object.

This PR manually sets `task` on the new SQLAlchemy object, so that `on_task_instance_success` receives proper TaskInstance with task field set.
@kaxil kaxil force-pushed the task-listener-fix branch from 5a7b901 to 00aac43 Compare October 20, 2022 19:33
@kaxil kaxil added this to the Airflow 2.4.2 milestone Oct 20, 2022
@kaxil kaxil merged commit 395ad71 into apache:main Oct 20, 2022
@kaxil kaxil deleted the task-listener-fix branch October 20, 2022 20:19
@kaxil kaxil modified the milestones: Airflow 2.4.2, Airflow 2.4.3 Oct 20, 2022
ephraimbuddy pushed a commit that referenced this pull request Nov 9, 2022
same as #21157

TaskListener API's contract promises to pass TaskInstance object to listener plugin. However, what happens is not 100% true - the object being passed is one that maps to current SQLAlchemy session.

`_run_raw_task` before merging the TI operates on detached TaskInstance object, then merges it to current session. Since there is no attached object in the SQLAlchemy identity map, SQLAlchemy creates it, and it's this object that's being passed to the SQLAlchemy event listeners.

The problem with that is that when creating new SQLAlchemy object, SQLAlchemy takes care about setting only database-mapped fields. The ones that are purely on the python side, like task aren't being set on the new object.

This PR manually sets `task` on the new SQLAlchemy object, so that `on_task_instance_success` receives proper TaskInstance with task field set.

(cherry picked from commit 395ad71)
@ephraimbuddy ephraimbuddy added the type:bug-fix Changelog: Bug Fixes label Nov 9, 2022
ephraimbuddy pushed a commit that referenced this pull request Nov 9, 2022
same as #21157

TaskListener API's contract promises to pass TaskInstance object to listener plugin. However, what happens is not 100% true - the object being passed is one that maps to current SQLAlchemy session.

`_run_raw_task` before merging the TI operates on detached TaskInstance object, then merges it to current session. Since there is no attached object in the SQLAlchemy identity map, SQLAlchemy creates it, and it's this object that's being passed to the SQLAlchemy event listeners.

The problem with that is that when creating new SQLAlchemy object, SQLAlchemy takes care about setting only database-mapped fields. The ones that are purely on the python side, like task aren't being set on the new object.

This PR manually sets `task` on the new SQLAlchemy object, so that `on_task_instance_success` receives proper TaskInstance with task field set.

(cherry picked from commit 395ad71)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants