Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert DagFileProcessor.execute_callbacks to Internal API #28900

Merged
merged 118 commits into from
Sep 22, 2023
Merged
Changes from 1 commit
Commits
Show all changes
118 commits
Select commit Hold shift + click to select a range
609aebb
Start converting _execute_task_callbacks to internal API. get_templat…
vincbeck Jan 12, 2023
ac48800
Move get_serialized_dag and get_task_instance methods to model classes
vincbeck Jan 16, 2023
b57ddca
Convert handle_failure() method to internal API
vincbeck Jan 18, 2023
917abfb
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jan 18, 2023
6dfebce
Remove comment
vincbeck Jan 18, 2023
1867db4
Migrate _execute_dag_callbacks to internal API
vincbeck Jan 20, 2023
6b110ea
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jan 20, 2023
c21fb81
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jan 25, 2023
d92fa83
Use cls.logger()
vincbeck Jan 25, 2023
cc52f47
Remove todo
vincbeck Feb 2, 2023
ee27659
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Feb 2, 2023
68bfc72
Add back if callbacks
vincbeck Feb 3, 2023
839dc6c
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Feb 6, 2023
30d6ef9
Remove comments
vincbeck Feb 6, 2023
9fb937d
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Feb 9, 2023
81ee61b
Refactore _fetch_callback
vincbeck Feb 9, 2023
3c300a7
Fix TaskInstance.get_task_instance
vincbeck Feb 9, 2023
dc6031e
Fix unit tests
vincbeck Feb 10, 2023
12c908a
Fix unit tests
vincbeck Feb 10, 2023
ccf3a86
Add methods to rpc_endpoint
vincbeck Feb 10, 2023
791ddbe
Add flag select_columns to get_task_instance
vincbeck Feb 10, 2023
120a030
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Feb 10, 2023
4f1ef33
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Feb 10, 2023
e72652e
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Feb 13, 2023
2fca2b3
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Mar 9, 2023
48ef1cf
Use Pydantic-powered ORM models
vincbeck Mar 9, 2023
ee63263
Fix DagRunPydantic
vincbeck Mar 9, 2023
3f5dc7f
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck May 8, 2023
f64ea0c
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck May 8, 2023
b61f9e6
Fix imports
vincbeck May 8, 2023
7bed9bc
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck May 8, 2023
c8a8d20
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck May 12, 2023
9277e53
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck May 19, 2023
fea8db0
Convert methods in models to private functions
vincbeck May 19, 2023
1fff88d
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jun 29, 2023
6b80508
Convert methods get_previous_scheduled_dagrun and get_previous_dagrun…
vincbeck Jun 29, 2023
c9ad169
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 4, 2023
dfc01a8
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 4, 2023
6d35f2f
Fix static checks
vincbeck Jul 4, 2023
920aae6
Convert DagRun.get_task_instance to internal API
vincbeck Jul 4, 2023
9492538
Fix circular dependencies
vincbeck Jul 5, 2023
10638cc
Fix spellcheck
vincbeck Jul 5, 2023
0179ba5
Disable type aliasing in pydantc models
vincbeck Jul 5, 2023
995cab1
Fix unit test
vincbeck Jul 5, 2023
03694b4
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 6, 2023
33640a9
Add custom_operator_name attribute to taskinstance Pydantic model
vincbeck Jul 6, 2023
52dd4b0
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 11, 2023
0f042e9
Fix static checks
vincbeck Jul 11, 2023
f463837
Fix unit tests
vincbeck Jul 11, 2023
8cf1fb1
Update reason in context
vincbeck Jul 12, 2023
16c334b
Add annotations to `fetch_callback` method
vincbeck Jul 12, 2023
7192075
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 12, 2023
d29dcd2
Resolve conflicts
vincbeck Jul 12, 2023
de30beb
Improve style of `handle_callback` method
vincbeck Jul 12, 2023
ed97834
Add default encoder
vincbeck Jul 12, 2023
658be83
Address feedbacks from @uranusjr
vincbeck Jul 13, 2023
62b9ba8
Add reason to context
vincbeck Jul 13, 2023
640c819
Fix unit test
vincbeck Jul 13, 2023
87b4fac
Fix params order
vincbeck Jul 14, 2023
67f2ee6
Convert private functions to kwarg only
vincbeck Jul 14, 2023
9914a36
Fix unit test
vincbeck Jul 14, 2023
6881a51
Fix taskinstance unit tests
vincbeck Jul 14, 2023
073ce8b
Fix pydantic unit tests
vincbeck Jul 14, 2023
d32a21e
Apply suggestion by uranujs@
vincbeck Jul 14, 2023
927344a
Address feedbacks from uranusjr@
vincbeck Jul 14, 2023
b42390b
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 17, 2023
ea1d9cd
Apply D205 style rule
vincbeck Jul 17, 2023
d87346f
Fix get_serialized_dag method
vincbeck Jul 17, 2023
d3480a7
Skipping some tests if AIP-44 if disabled
vincbeck Jul 17, 2023
638f149
Remove _get_task_instance function
vincbeck Jul 18, 2023
3249ce4
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 20, 2023
9bb04fe
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 21, 2023
67fd268
Remove get_dagrun function
vincbeck Jul 21, 2023
c6cc066
Remove get_task_instances method
vincbeck Jul 21, 2023
e782d50
Remove get_task_instance method
vincbeck Jul 21, 2023
9a5e1ab
Fix static checks
vincbeck Jul 21, 2023
bdd2370
Revert "Remove get_task_instance method"
vincbeck Jul 21, 2023
4c1d4fa
Revert "Remove get_task_instances method"
vincbeck Jul 24, 2023
7f73bc1
Revert "Remove get_dagrun function"
vincbeck Jul 24, 2023
0ae55e1
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 24, 2023
72d106e
Fix static checks
vincbeck Jul 24, 2023
10a511f
Fix tests
vincbeck Jul 24, 2023
c4d385d
Leave annotations
vincbeck Jul 25, 2023
c80105b
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 25, 2023
ae79b6b
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Aug 18, 2023
46d2e25
Fix static checks
vincbeck Aug 18, 2023
8a71857
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Aug 29, 2023
4796186
Add type annotation to _stop_remaining_tasks()
vincbeck Aug 29, 2023
9ac5d30
Add type annotation to handle_callback()
vincbeck Aug 29, 2023
8ca799b
Pass `dagrun_id` to fetch_callback()
vincbeck Aug 29, 2023
e795915
Pass `dag_run_id` to fetch_task_instance()
vincbeck Aug 29, 2023
a14f941
Pass `dag_run_id` to get_previous_scheduled_dagrun()
vincbeck Aug 29, 2023
b2d5bc9
Remove `_set_duration` and introduce `set_end_date`
vincbeck Aug 29, 2023
975eba8
Fix `set_end_date`
vincbeck Aug 30, 2023
adc10fd
Add TaskInstancePydantic.update_forward_refs()
vincbeck Aug 30, 2023
7921dc5
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Aug 31, 2023
f76bc56
Replace `save_to_db()` to `finish_task()`
vincbeck Aug 31, 2023
4178b7c
Fix static checks
vincbeck Aug 31, 2023
2a3edf1
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Sep 1, 2023
8d35a1e
Fix static checks
vincbeck Sep 1, 2023
2de2375
Cleanup
vincbeck Sep 1, 2023
85743a8
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Sep 5, 2023
5882674
Fix tests
vincbeck Sep 5, 2023
10183ac
Fix tests
vincbeck Sep 5, 2023
c6fb38b
Make pydantic models ignore TCH001 rule
vincbeck Sep 5, 2023
5682a56
Use List instead of list in Pydantic models
vincbeck Sep 5, 2023
cbd4757
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Sep 6, 2023
df2ff72
Fix tests
vincbeck Sep 6, 2023
512872d
Add `refresh_from_db` after setting the duration
vincbeck Sep 6, 2023
a44ac1a
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Sep 7, 2023
dee5655
Fix import
vincbeck Sep 7, 2023
8c60f21
Revert "Add `refresh_from_db` after setting the duration"
vincbeck Sep 13, 2023
3923f2c
Revert "Cleanup"
vincbeck Sep 13, 2023
989672e
Revert "Replace `save_to_db()` to `finish_task()`"
vincbeck Sep 13, 2023
d51371e
Revert "Fix `set_end_date`"
vincbeck Sep 13, 2023
2b9014b
Revert "Remove `_set_duration` and introduce `set_end_date`"
vincbeck Sep 13, 2023
c6573c7
Add `dataset` property to `DatasetEventPydantic`
vincbeck Sep 13, 2023
0653a33
Remove assert
vincbeck Sep 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move get_serialized_dag and get_task_instance methods to model classes
vincbeck committed Jan 16, 2023
commit ac4880086909d3b8409397b0ef1f104d9720e190
16 changes: 0 additions & 16 deletions airflow/api_internal/actions/__init__.py

This file was deleted.

42 changes: 0 additions & 42 deletions airflow/api_internal/actions/dag.py

This file was deleted.

56 changes: 0 additions & 56 deletions airflow/api_internal/actions/taskinstance.py

This file was deleted.

9 changes: 4 additions & 5 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
@@ -32,8 +32,6 @@
from sqlalchemy.orm.session import Session

from airflow import settings
from airflow.api_internal.actions.dag import InternalApiDagActions
from airflow.api_internal.actions.taskinstance import InternalApiTaskInstanceActions
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import (
CallbackRequest,
@@ -48,7 +46,8 @@
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun as DR
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.taskinstance import TaskInstance as TI
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance, TaskInstance as TI
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.email import get_email_address_list, send_email
@@ -691,7 +690,7 @@ def _execute_task_callbacks(self, dagbag: DagBag | None, request: TaskCallbackRe
return

simple_ti = request.simple_task_instance
ti = InternalApiTaskInstanceActions.get_task_instance(
ti = TaskInstance.get_task_instance(
dag_id=simple_ti.dag_id,
run_id=simple_ti.run_id,
task_id=simple_ti.task_id,
@@ -714,7 +713,7 @@ def _execute_task_callbacks(self, dagbag: DagBag | None, request: TaskCallbackRe
# `handle_failure` so that the state of the TI gets progressed.
#
# Since handle_failure _really_ wants a task, we do our best effort to give it one
task = InternalApiDagActions.get_serialized_dag(
task = SerializedDagModel.get_serialized_dag(
dag_id=simple_ti.dag_id, task_id=simple_ti.task_id, session=session
)

22 changes: 20 additions & 2 deletions airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
@@ -24,18 +24,21 @@
from datetime import datetime, timedelta

import sqlalchemy_jsonfield
from sqlalchemy import BigInteger, Column, Index, LargeBinary, String, and_, or_
from sqlalchemy import BigInteger, Column, Index, LargeBinary, String, and_, exc, or_
from sqlalchemy.orm import Session, backref, foreign, relationship
from sqlalchemy.sql.expression import func, literal

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.exceptions import TaskNotFound
from airflow.models import Operator
from airflow.models.base import ID_LEN, Base
from airflow.models.dag import DAG, DagModel
from airflow.models.dagcode import DagCode
from airflow.models.dagrun import DagRun
from airflow.serialization.serialized_objects import DagDependency, SerializedDAG
from airflow.settings import COMPRESS_SERIALIZED_DAGS, MIN_SERIALIZED_DAG_UPDATE_INTERVAL, json
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime

log = logging.getLogger(__name__)
@@ -368,3 +371,18 @@ def get_dag_dependencies(cls, session: Session = None) -> dict[str, list[DagDepe
else:
iterator = session.query(cls.dag_id, func.json_extract_path(cls._data, "dag", "dag_dependencies"))
return {dag_id: [DagDependency(**d) for d in (deps_data or [])] for dag_id, deps_data in iterator}

@staticmethod
@internal_api_call
@provide_session
def get_serialized_dag(dag_id: str, task_id: str, session: Session = NEW_SESSION) -> Operator | None:
from airflow.models.serialized_dag import SerializedDagModel

try:
model = session.query(SerializedDagModel).get(dag_id)
if model:
return model.dag.get_task(task_id)
except (exc.NoResultFound, TaskNotFound):
pass

return None
33 changes: 31 additions & 2 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
@@ -64,7 +64,7 @@
from sqlalchemy.sql.expression import ColumnOperators, case

from airflow import settings
from airflow.api_internal.actions.taskinstance import InternalApiTaskInstanceActions
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.compat.functools import cache
from airflow.configuration import conf
from airflow.datasets import Dataset
@@ -84,6 +84,7 @@
UnmappableXComTypePushed,
XComForMappingNotPushed,
)
from airflow.jobs.scheduler_job import TI
from airflow.models.base import Base, StringID
from airflow.models.log import Log
from airflow.models.mappedoperator import MappedOperator
@@ -109,6 +110,7 @@
from airflow.utils.net import get_hostname
from airflow.utils.operator_helpers import context_to_airflow_vars
from airflow.utils.platform import getuser
from airflow.utils.retries import run_with_db_retries
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.sqlalchemy import (
ExecutorConfigType,
@@ -766,6 +768,33 @@ def error(self, session: Session = NEW_SESSION) -> None:
session.merge(self)
session.commit()

@staticmethod
@internal_api_call
@provide_session
def get_task_instance(
dag_id: str,
run_id: str,
task_id: str,
map_index: int,
lock_for_update: bool = False,
session: Session = NEW_SESSION,
) -> TI | None:
# TODO: need to convert SQLAlchemy objects to internal API objects
query = session.query(TI).filter_by(
dag_id=dag_id,
run_id=run_id,
task_id=task_id,
map_index=map_index,
)

if lock_for_update:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does it behave in Internal API?
Does it release the rows when session is destroyed (I expect it) - so just after the results are returned

We may need to think about the scenario when the row-locking is important for potential race conditions or consistency, but I see it's outside of this PR (as we don't use it)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a good idea would be to split that to two methods. We definitely do not want to have a case where we have lock_for_update=True over the internal API so rather than just leave it here, I think we could probably duplicate that method (with some DRY-internal method query) into:



# no internal api 
@provide_session
def get_task_instance_with_lock(
        cls,
        dag_id: str,
        run_id: str,
        task_id: str,
        map_index: int,
        select_columns: bool = False,
        session: Session = NEW_SESSION)
    _get_task_instance(...., lock_for_update=True)



@provide_session
@internal_api_call
def get_task_instance_no_lock(
        cls,
        dag_id: str,
        run_id: str,
        task_id: str,
        map_index: int,
        select_columns: bool = False,
        session: Session = NEW_SESSION):
    _get_task_instance(...., lock_for_update=False)

def _get_task_instance(...

# TODO: pass the logger once resolved here https://github.com/apache/airflow/pull/28502
for attempt in run_with_db_retries(logger=None):
with attempt:
return query.with_for_update().one_or_none()
else:
return query.one_or_none()

@provide_session
def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: bool = False) -> None:
"""
@@ -776,7 +805,7 @@ def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: bool
lock the TaskInstance (issuing a FOR UPDATE clause) until the
session is committed.
"""
ti = InternalApiTaskInstanceActions.get_task_instance(
ti = TaskInstance.get_task_instance(
dag_id=self.dag_id,
task_id=self.task_id,
run_id=self.run_id,