Skip to content

Commit

Permalink
Fix and speed up grid view (#23947)
Browse files Browse the repository at this point in the history
This fetches all TIs for a given task across dag runs, leading to
signifincatly faster response times. It also fixes a bug where Nones
were being passed to the UI when a new task was added to a DAG with
exiting runs.

(cherry picked from commit 1cf483f)
  • Loading branch information
jedcunningham committed May 26, 2022
1 parent a88900c commit 2560dd5
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 30 deletions.
57 changes: 28 additions & 29 deletions airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,39 +128,38 @@ def get_mapped_summary(parent_instance, task_instances):
}


def get_task_summary(dag_run: DagRun, task, session: Session) -> Optional[Dict[str, Any]]:
task_instance = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == task.dag_id,
TaskInstance.run_id == dag_run.run_id,
TaskInstance.task_id == task.task_id,
# Only get normal task instances or the first mapped task
TaskInstance.map_index <= 0,
)
.first()
def get_task_summaries(task, dag_runs: List[DagRun], session: Session) -> List[Dict[str, Any]]:
tis = session.query(TaskInstance).filter(
TaskInstance.dag_id == task.dag_id,
TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
TaskInstance.task_id == task.task_id,
# Only get normal task instances or the first mapped task
TaskInstance.map_index <= 0,
)

if not task_instance:
return None
def _get_summary(task_instance):
if task_instance.map_index > -1:
return get_mapped_summary(
task_instance, task_instances=get_mapped_instances(task_instance, session)
)

if task_instance.map_index > -1:
return get_mapped_summary(task_instance, task_instances=get_mapped_instances(task_instance, session))
try_count = (
task_instance.prev_attempted_tries
if task_instance.prev_attempted_tries != 0
else task_instance.try_number
)

try_count = (
task_instance.prev_attempted_tries
if task_instance.prev_attempted_tries != 0
else task_instance.try_number
)
return {
'task_id': task_instance.task_id,
'run_id': task_instance.run_id,
'map_index': task_instance.map_index,
'state': task_instance.state,
'start_date': datetime_to_string(task_instance.start_date),
'end_date': datetime_to_string(task_instance.end_date),
'try_number': try_count,
}
return {
'task_id': task_instance.task_id,
'run_id': task_instance.run_id,
'map_index': task_instance.map_index,
'state': task_instance.state,
'start_date': datetime_to_string(task_instance.start_date),
'end_date': datetime_to_string(task_instance.end_date),
'try_number': try_count,
}

return [_get_summary(ti) for ti in tis]


def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]:
Expand Down
2 changes: 1 addition & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def task_group_to_grid(task_item_or_group, dag, dag_runs, session):
if isinstance(task_item_or_group, AbstractOperator):
return {
'id': task_item_or_group.task_id,
'instances': [wwwutils.get_task_summary(dr, task_item_or_group, session) for dr in dag_runs],
'instances': wwwutils.get_task_summaries(task_item_or_group, dag_runs, session),
'label': task_item_or_group.label,
'extra_links': task_item_or_group.extra_links,
'is_mapped': task_item_or_group.is_mapped,
Expand Down

0 comments on commit 2560dd5

Please sign in to comment.