Skip to content

Commit

Permalink
Fix set_end_date
Browse files Browse the repository at this point in the history
  • Loading branch information
vincbeck committed Aug 30, 2023
1 parent b2d5bc9 commit 975eba8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
16 changes: 11 additions & 5 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1724,12 +1724,15 @@ def set_state(self, state: str | None, session: Session = NEW_SESSION) -> bool:
def set_end_date(
cls,
dag_id: str,
run_id: str,
task_id: str,
execution_date: datetime | None,
map_index: int,
end_date: datetime,
session: Session = NEW_SESSION,
):
task_instance = session.get(dag_id=dag_id, task_id=task_id, execution_date=execution_date)
task_instance = session.get(
TaskInstance, {"task_id": task_id, "dag_id": dag_id, "run_id": run_id, "map_index": map_index}
)
task_instance.end_date = end_date

if task_instance.start_date:
Expand Down Expand Up @@ -2308,8 +2311,9 @@ def _run_raw_task(
_log_state(task_instance=self)
TaskInstance.set_end_date(
dag_id=self.dag_id,
run_id=self.run_id,
task_id=self.task_id,
execution_date=self.execution_date,
map_index=self.map_index,
end_date=timezone.utcnow(),
session=session,
)
Expand Down Expand Up @@ -2533,8 +2537,9 @@ def _handle_reschedule(

TaskInstance.set_end_date(
dag_id=self.dag_id,
run_id=self.run_id,
task_id=self.task_id,
execution_date=self.execution_date,
map_index=self.map_index,
end_date=timezone.utcnow(),
session=session,
)
Expand Down Expand Up @@ -2621,8 +2626,9 @@ def fetch_handle_failure_context(

TaskInstance.set_end_date(
dag_id=ti.dag_id,
run_id=ti.run_id,
task_id=ti.task_id,
execution_date=ti.execution_date,
map_index=ti.map_index,
end_date=timezone.utcnow(),
session=session,
)
Expand Down
6 changes: 4 additions & 2 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1993,8 +1993,9 @@ def test_set_end_date(self, session):

TaskInstance.set_end_date(
dag_id=ti.dag_id,
run_id=ti.run_id,
task_id=ti.task_id,
execution_date=ti.execution_date,
map_index=ti.map_index,
end_date=datetime.datetime(2018, 10, 1, 2),
session=session,
)
Expand All @@ -2009,8 +2010,9 @@ def test_set_duration_empty_dates(self, session):

TaskInstance.set_end_date(
dag_id=ti.dag_id,
run_id=ti.run_id,
task_id=ti.task_id,
execution_date=ti.execution_date,
map_index=ti.map_index,
end_date=datetime.datetime(2018, 10, 1, 2),
session=session,
)
Expand Down

0 comments on commit 975eba8

Please sign in to comment.