Skip to content

Commit

Permalink
Do not return ongoin dagrun when a end_date is less than utcnow (#33488)
Browse files Browse the repository at this point in the history
(cherry picked from commit 7c51c87)
  • Loading branch information
pierrejeambrun authored and ephraimbuddy committed Aug 28, 2023
1 parent ea432df commit 5c8e799
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 14 deletions.
5 changes: 4 additions & 1 deletion airflow/www/static/js/cluster-activity/useFilters.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ export const now = date.toISOString();
const useFilters = (): FilterHookReturn => {
const [searchParams, setSearchParams] = useSearchParams();

const endDate = searchParams.get(END_DATE_PARAM) || now;
const endDate =
searchParams.get(END_DATE_PARAM) ||
// @ts-ignore
moment(now).add(1, "h").toISOString();
const startDate =
searchParams.get(START_DATE_PARAM) ||
// @ts-ignore
Expand Down
13 changes: 7 additions & 6 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from markupsafe import Markup, escape
from pendulum.datetime import DateTime
from pendulum.parsing.exceptions import ParserError
from sqlalchemy import Date, and_, case, desc, func, inspect, or_, select, union_all
from sqlalchemy import Date, and_, case, desc, func, inspect, select, union_all
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, joinedload
from wtforms import BooleanField, validators
Expand Down Expand Up @@ -3739,13 +3739,14 @@ def historical_metrics_data(self):
"""Returns cluster activity historical metrics."""
start_date = _safe_parse_datetime(request.args.get("start_date"))
end_date = _safe_parse_datetime(request.args.get("end_date"))

with create_session() as session:
# DagRuns
dag_runs_type = session.execute(
dag_run_types = session.execute(
select(DagRun.run_type, func.count(DagRun.run_id))
.where(
DagRun.start_date >= start_date,
or_(DagRun.end_date.is_(None), DagRun.end_date <= end_date),
func.coalesce(DagRun.end_date, datetime.datetime.utcnow()) <= end_date,
)
.group_by(DagRun.run_type)
).all()
Expand All @@ -3754,7 +3755,7 @@ def historical_metrics_data(self):
select(DagRun.state, func.count(DagRun.run_id))
.where(
DagRun.start_date >= start_date,
or_(DagRun.end_date.is_(None), DagRun.end_date <= end_date),
func.coalesce(DagRun.end_date, datetime.datetime.utcnow()) <= end_date,
)
.group_by(DagRun.state)
).all()
Expand All @@ -3765,15 +3766,15 @@ def historical_metrics_data(self):
.join(TaskInstance.dag_run)
.where(
DagRun.start_date >= start_date,
or_(DagRun.end_date.is_(None), DagRun.end_date <= end_date),
func.coalesce(DagRun.end_date, datetime.datetime.utcnow()) <= end_date,
)
.group_by(TaskInstance.state)
).all()

data = {
"dag_run_types": {
**{dag_run_type.value: 0 for dag_run_type in DagRunType},
**{run_type: sum_value for run_type, sum_value in dag_runs_type},
**{run_type: sum_value for run_type, sum_value in dag_run_types},
},
"dag_run_states": {
**{dag_run_state.value: 0 for dag_run_state in DagRunState},
Expand Down
25 changes: 18 additions & 7 deletions tests/www/views/test_views_cluster_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def freeze_time_for_dagruns(time_machine):


@pytest.fixture
def make_dag_runs(dag_maker, session):
def make_dag_runs(dag_maker, session, time_machine):
with dag_maker(
dag_id="test_dag_id",
serialized=True,
Expand All @@ -76,29 +76,40 @@ def make_dag_runs(dag_maker, session):
start_date=dag_maker.dag.next_dagrun_info(date).logical_date,
)

run3 = dag_maker.create_dagrun(
run_id="run_3",
state=DagRunState.RUNNING,
run_type=DagRunType.SCHEDULED,
execution_date=pendulum.DateTime(2023, 2, 3, 0, 0, 0, tzinfo=pendulum.UTC),
start_date=pendulum.DateTime(2023, 2, 3, 0, 0, 0, tzinfo=pendulum.UTC),
)
run3.end_date = None

for ti in run1.task_instances:
ti.state = TaskInstanceState.SUCCESS

for ti in run2.task_instances:
ti.state = TaskInstanceState.FAILED

time_machine.move_to("2023-07-02T00:00:00+00:00", tick=False)

session.flush()


@pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
def test_historical_metrics_data(admin_client, session):
def test_historical_metrics_data(admin_client, session, time_machine):
resp = admin_client.get(
"/object/historical_metrics_data?start_date=2023-01-01T00:00&end_date=2023-05-02T00:00",
"/object/historical_metrics_data?start_date=2023-01-01T00:00&end_date=2023-08-02T00:00",
follow_redirects=True,
)
assert resp.status_code == 200
assert resp.json == {
"dag_run_states": {"failed": 1, "queued": 0, "running": 0, "success": 1},
"dag_run_types": {"backfill": 0, "dataset_triggered": 1, "manual": 0, "scheduled": 1},
"dag_run_states": {"failed": 1, "queued": 0, "running": 1, "success": 1},
"dag_run_types": {"backfill": 0, "dataset_triggered": 1, "manual": 0, "scheduled": 2},
"task_instance_states": {
"deferred": 0,
"failed": 2,
"no_status": 0,
"no_status": 2,
"queued": 0,
"removed": 0,
"restarting": 0,
Expand All @@ -117,7 +128,7 @@ def test_historical_metrics_data(admin_client, session):
@pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
def test_historical_metrics_data_date_filters(admin_client, session):
resp = admin_client.get(
"/object/historical_metrics_data?start_date=2023-02-02T00:00&end_date=2023-05-02T00:00",
"/object/historical_metrics_data?start_date=2023-02-02T00:00&end_date=2023-06-02T00:00",
follow_redirects=True,
)
assert resp.status_code == 200
Expand Down

0 comments on commit 5c8e799

Please sign in to comment.