Skip to content

Commit

Permalink
feat(airflow): show dag/task logs in CI (#11981)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Dec 2, 2024
1 parent 580390e commit 411e1a3
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import random
import signal
import subprocess
import textwrap
import time
from typing import Any, Iterator, Sequence

Expand Down Expand Up @@ -110,6 +111,48 @@ def _wait_for_dag_finish(
raise NotReadyError(f"DAG has not finished yet: {dag_run['state']}")


def _dump_dag_logs(airflow_instance: AirflowInstance, dag_id: str) -> None:
# Get the dag run info
res = airflow_instance.session.get(
f"{airflow_instance.airflow_url}/api/v1/dags/{dag_id}/dagRuns", timeout=5
)
res.raise_for_status()
dag_run = res.json()["dag_runs"][0]
dag_run_id = dag_run["dag_run_id"]

# List the tasks in the dag run
res = airflow_instance.session.get(
f"{airflow_instance.airflow_url}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances",
timeout=5,
)
res.raise_for_status()
task_instances = res.json()["task_instances"]

# Sort tasks by start_date to maintain execution order
task_instances.sort(key=lambda x: x["start_date"] or "")

print(f"\nTask execution order for DAG {dag_id}:")
for task in task_instances:
task_id = task["task_id"]
state = task["state"]
try_number = task.get("try_number", 1)

task_header = f"Task: {task_id} (State: {state}; Try: {try_number})"

# Get logs for the task's latest try number
try:
res = airflow_instance.session.get(
f"{airflow_instance.airflow_url}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}"
f"/taskInstances/{task_id}/logs/{try_number}",
params={"full_content": "true"},
timeout=5,
)
res.raise_for_status()
print(f"\n=== {task_header} ===\n{textwrap.indent(res.text, ' ')}")
except Exception as e:
print(f"Failed to fetch logs for {task_header}: {e}")


@contextlib.contextmanager
def _run_airflow(
tmp_path: pathlib.Path,
Expand Down Expand Up @@ -377,6 +420,11 @@ def test_airflow_plugin(
print("Sleeping for a few seconds to let the plugin finish...")
time.sleep(10)

try:
_dump_dag_logs(airflow_instance, dag_id)
except Exception as e:
print(f"Failed to dump DAG logs: {e}")

if dag_id == DAG_TO_SKIP_INGESTION:
# Verify that no MCPs were generated.
assert not os.path.exists(airflow_instance.metadata_file)
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion-modules/airflow-plugin/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ commands =
[testenv:py310-airflow24]
extras = dev,integration-tests,plugin-v2,test-airflow24

[testenv:py310-airflow{26,27,28},py311-airflow{29,210}]
[testenv:py3{10,11}-airflow{26,27,28,29,210}]
extras = dev,integration-tests,plugin-v2

0 comments on commit 411e1a3

Please sign in to comment.