Skip to content

Commit

Permalink
Fix auto refresh for graph view (#26926)
Browse files Browse the repository at this point in the history
* Fix auto refresh for graph view

* Add task_instances view test

* Use freezegun to mock datetime

Co-authored-by: Tzu-ping Chung <[email protected]>
  • Loading branch information
pierrejeambrun and uranusjr authored Oct 11, 2022
1 parent cb8c671 commit 6462292
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 27 deletions.
5 changes: 2 additions & 3 deletions airflow/www/static/js/graph.js
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,10 @@ function handleRefresh() {
// only refresh if the data has changed
if (prevTis !== tis) {
// eslint-disable-next-line no-global-assign
taskInstances = JSON.parse(tis);
updateNodesStates(taskInstances);
updateNodesStates(tis);

// Only redraw the graph if labels have changed
const haveLabelsChanged = updateNodeLabels(nodes, taskInstances);
const haveLabelsChanged = updateNodeLabels(nodes, tis);
if (haveLabelsChanged) draw();

// end refresh if all states are final
Expand Down
271 changes: 247 additions & 24 deletions tests/www/views/test_views_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import urllib.parse
from datetime import timedelta

import freezegun
import pytest

from airflow import settings
Expand Down Expand Up @@ -60,30 +61,31 @@ def reset_dagruns():

@pytest.fixture(autouse=True)
def init_dagruns(app, reset_dagruns):
app.dag_bag.get_dag("example_bash_operator").create_dagrun(
run_id=DEFAULT_DAGRUN,
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
start_date=timezone.utcnow(),
state=State.RUNNING,
)
app.dag_bag.get_dag("example_subdag_operator").create_dagrun(
run_id=DEFAULT_DAGRUN,
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
start_date=timezone.utcnow(),
state=State.RUNNING,
)
app.dag_bag.get_dag("example_xcom").create_dagrun(
run_id=DEFAULT_DAGRUN,
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
start_date=timezone.utcnow(),
state=State.RUNNING,
)
with freezegun.freeze_time(DEFAULT_DATE):
app.dag_bag.get_dag("example_bash_operator").create_dagrun(
run_id=DEFAULT_DAGRUN,
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
start_date=timezone.utcnow(),
state=State.RUNNING,
)
app.dag_bag.get_dag("example_subdag_operator").create_dagrun(
run_id=DEFAULT_DAGRUN,
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
start_date=timezone.utcnow(),
state=State.RUNNING,
)
app.dag_bag.get_dag("example_xcom").create_dagrun(
run_id=DEFAULT_DAGRUN,
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
start_date=timezone.utcnow(),
state=State.RUNNING,
)
yield
clear_db_runs()

Expand Down Expand Up @@ -993,3 +995,224 @@ def test_graph_view_doesnt_fail_on_recursion_error(app, dag_maker, admin_client)
url = f'/dags/{dag.dag_id}/graph'
resp = admin_client.get(url, follow_redirects=True)
assert resp.status_code == 200


def test_task_instances(admin_client):
"""Test task_instances view."""
resp = admin_client.get(
f'/object/task_instances?dag_id=example_bash_operator&execution_date={DEFAULT_DATE}',
follow_redirects=True,
)
assert resp.status_code == 200
assert resp.json == {
'also_run_this': {
'dag_id': 'example_bash_operator',
'duration': None,
'end_date': None,
'executor_config': {},
'external_executor_id': None,
'hostname': '',
'job_id': None,
'map_index': -1,
'max_tries': 0,
'next_kwargs': None,
'next_method': None,
'operator': 'BashOperator',
'pid': None,
'pool': 'default_pool',
'pool_slots': 1,
'priority_weight': 2,
'queue': 'default',
'queued_by_job_id': None,
'queued_dttm': None,
'run_id': 'TEST_DAGRUN',
'start_date': None,
'state': None,
'task_id': 'also_run_this',
'trigger_id': None,
'trigger_timeout': None,
'try_number': 1,
'unixname': 'root',
'updated_at': DEFAULT_DATE.isoformat(),
},
'run_after_loop': {
'dag_id': 'example_bash_operator',
'duration': None,
'end_date': None,
'executor_config': {},
'external_executor_id': None,
'hostname': '',
'job_id': None,
'map_index': -1,
'max_tries': 0,
'next_kwargs': None,
'next_method': None,
'operator': 'BashOperator',
'pid': None,
'pool': 'default_pool',
'pool_slots': 1,
'priority_weight': 2,
'queue': 'default',
'queued_by_job_id': None,
'queued_dttm': None,
'run_id': 'TEST_DAGRUN',
'start_date': None,
'state': None,
'task_id': 'run_after_loop',
'trigger_id': None,
'trigger_timeout': None,
'try_number': 1,
'unixname': 'root',
'updated_at': DEFAULT_DATE.isoformat(),
},
'run_this_last': {
'dag_id': 'example_bash_operator',
'duration': None,
'end_date': None,
'executor_config': {},
'external_executor_id': None,
'hostname': '',
'job_id': None,
'map_index': -1,
'max_tries': 0,
'next_kwargs': None,
'next_method': None,
'operator': 'EmptyOperator',
'pid': None,
'pool': 'default_pool',
'pool_slots': 1,
'priority_weight': 1,
'queue': 'default',
'queued_by_job_id': None,
'queued_dttm': None,
'run_id': 'TEST_DAGRUN',
'start_date': None,
'state': None,
'task_id': 'run_this_last',
'trigger_id': None,
'trigger_timeout': None,
'try_number': 1,
'unixname': 'root',
'updated_at': DEFAULT_DATE.isoformat(),
},
'runme_0': {
'dag_id': 'example_bash_operator',
'duration': None,
'end_date': None,
'executor_config': {},
'external_executor_id': None,
'hostname': '',
'job_id': None,
'map_index': -1,
'max_tries': 0,
'next_kwargs': None,
'next_method': None,
'operator': 'BashOperator',
'pid': None,
'pool': 'default_pool',
'pool_slots': 1,
'priority_weight': 3,
'queue': 'default',
'queued_by_job_id': None,
'queued_dttm': None,
'run_id': 'TEST_DAGRUN',
'start_date': None,
'state': None,
'task_id': 'runme_0',
'trigger_id': None,
'trigger_timeout': None,
'try_number': 1,
'unixname': 'root',
'updated_at': DEFAULT_DATE.isoformat(),
},
'runme_1': {
'dag_id': 'example_bash_operator',
'duration': None,
'end_date': None,
'executor_config': {},
'external_executor_id': None,
'hostname': '',
'job_id': None,
'map_index': -1,
'max_tries': 0,
'next_kwargs': None,
'next_method': None,
'operator': 'BashOperator',
'pid': None,
'pool': 'default_pool',
'pool_slots': 1,
'priority_weight': 3,
'queue': 'default',
'queued_by_job_id': None,
'queued_dttm': None,
'run_id': 'TEST_DAGRUN',
'start_date': None,
'state': None,
'task_id': 'runme_1',
'trigger_id': None,
'trigger_timeout': None,
'try_number': 1,
'unixname': 'root',
'updated_at': DEFAULT_DATE.isoformat(),
},
'runme_2': {
'dag_id': 'example_bash_operator',
'duration': None,
'end_date': None,
'executor_config': {},
'external_executor_id': None,
'hostname': '',
'job_id': None,
'map_index': -1,
'max_tries': 0,
'next_kwargs': None,
'next_method': None,
'operator': 'BashOperator',
'pid': None,
'pool': 'default_pool',
'pool_slots': 1,
'priority_weight': 3,
'queue': 'default',
'queued_by_job_id': None,
'queued_dttm': None,
'run_id': 'TEST_DAGRUN',
'start_date': None,
'state': None,
'task_id': 'runme_2',
'trigger_id': None,
'trigger_timeout': None,
'try_number': 1,
'unixname': 'root',
'updated_at': DEFAULT_DATE.isoformat(),
},
'this_will_skip': {
'dag_id': 'example_bash_operator',
'duration': None,
'end_date': None,
'executor_config': {},
'external_executor_id': None,
'hostname': '',
'job_id': None,
'map_index': -1,
'max_tries': 0,
'next_kwargs': None,
'next_method': None,
'operator': 'BashOperator',
'pid': None,
'pool': 'default_pool',
'pool_slots': 1,
'priority_weight': 2,
'queue': 'default',
'queued_by_job_id': None,
'queued_dttm': None,
'run_id': 'TEST_DAGRUN',
'start_date': None,
'state': None,
'task_id': 'this_will_skip',
'trigger_id': None,
'trigger_timeout': None,
'try_number': 1,
'unixname': 'root',
'updated_at': DEFAULT_DATE.isoformat(),
},
}

0 comments on commit 6462292

Please sign in to comment.