diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index 44efd94f834b15..3becf10703df6c 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -8,6 +8,7 @@ import random import signal import subprocess +import textwrap import time from typing import Any, Iterator, Sequence @@ -110,6 +111,48 @@ def _wait_for_dag_finish( raise NotReadyError(f"DAG has not finished yet: {dag_run['state']}") +def _dump_dag_logs(airflow_instance: AirflowInstance, dag_id: str) -> None: + # Get the dag run info + res = airflow_instance.session.get( + f"{airflow_instance.airflow_url}/api/v1/dags/{dag_id}/dagRuns", timeout=5 + ) + res.raise_for_status() + dag_run = res.json()["dag_runs"][0] + dag_run_id = dag_run["dag_run_id"] + + # List the tasks in the dag run + res = airflow_instance.session.get( + f"{airflow_instance.airflow_url}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances", + timeout=5, + ) + res.raise_for_status() + task_instances = res.json()["task_instances"] + + # Sort tasks by start_date to maintain execution order + task_instances.sort(key=lambda x: x["start_date"] or "") + + print(f"\nTask execution order for DAG {dag_id}:") + for task in task_instances: + task_id = task["task_id"] + state = task["state"] + try_number = task.get("try_number", 1) + + task_header = f"Task: {task_id} (State: {state}; Try: {try_number})" + + # Get logs for the task's latest try number + try: + res = airflow_instance.session.get( + f"{airflow_instance.airflow_url}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}" + f"/taskInstances/{task_id}/logs/{try_number}", + params={"full_content": "true"}, + timeout=5, + ) + res.raise_for_status() + print(f"\n=== {task_header} ===\n{textwrap.indent(res.text, ' ')}") + except Exception as e: + print(f"Failed to fetch logs for {task_header}: {e}") + + @contextlib.contextmanager def _run_airflow( tmp_path: pathlib.Path, @@ -377,6 +420,11 @@ def test_airflow_plugin( print("Sleeping for a few seconds to let the plugin finish...") time.sleep(10) + try: + _dump_dag_logs(airflow_instance, dag_id) + except Exception as e: + print(f"Failed to dump DAG logs: {e}") + if dag_id == DAG_TO_SKIP_INGESTION: # Verify that no MCPs were generated. assert not os.path.exists(airflow_instance.metadata_file) diff --git a/metadata-ingestion-modules/airflow-plugin/tox.ini b/metadata-ingestion-modules/airflow-plugin/tox.ini index 2e4596a24c2a6c..28c0b9532bcb8e 100644 --- a/metadata-ingestion-modules/airflow-plugin/tox.ini +++ b/metadata-ingestion-modules/airflow-plugin/tox.ini @@ -59,6 +59,6 @@ commands = [testenv:py310-airflow24] extras = dev,integration-tests,plugin-v2,test-airflow24 -[testenv:py310-airflow{26,27,28},py311-airflow{29,210}] +[testenv:py3{10,11}-airflow{26,27,28,29,210}] extras = dev,integration-tests,plugin-v2