From 9cb56cc5038131fad652f2ed0e9c89d68e3f2e8d Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Fri, 8 Nov 2024 15:17:30 +0100 Subject: [PATCH] utilize more information to deterministically generate OpenLineage run_id Signed-off-by: Maciej Obuchowski --- .../providers/dbt/cloud/utils/openlineage.py | 2 + .../providers/openlineage/plugins/adapter.py | 4 +- .../providers/openlineage/plugins/listener.py | 8 +++ .../providers/openlineage/plugins/macros.py | 2 + .../providers/openlineage/utils/utils.py | 6 +- .../tests/openlineage/plugins/test_adapter.py | 30 +++++++++ .../openlineage/plugins/test_execution.py | 1 - .../openlineage/plugins/test_listener.py | 66 +++++++++---------- 8 files changed, 78 insertions(+), 41 deletions(-) diff --git a/providers/src/airflow/providers/dbt/cloud/utils/openlineage.py b/providers/src/airflow/providers/dbt/cloud/utils/openlineage.py index 41b1f9635fd1c..d7a9ad700b83d 100644 --- a/providers/src/airflow/providers/dbt/cloud/utils/openlineage.py +++ b/providers/src/airflow/providers/dbt/cloud/utils/openlineage.py @@ -156,6 +156,8 @@ async def get_artifacts_for_steps(steps, artifacts): task_id=operator.task_id, logical_date=_get_logical_date(task_instance), try_number=_get_try_number(task_instance), + queued_dttm=task_instance.queued_dttm, + map_index=task_instance.map_index, ) parent_job = ParentRunMetadata( diff --git a/providers/src/airflow/providers/openlineage/plugins/adapter.py b/providers/src/airflow/providers/openlineage/plugins/adapter.py index 688f2d65a547c..20331a62b3544 100644 --- a/providers/src/airflow/providers/openlineage/plugins/adapter.py +++ b/providers/src/airflow/providers/openlineage/plugins/adapter.py @@ -129,11 +129,13 @@ def build_task_instance_run_id( task_id: str, try_number: int, logical_date: datetime, + queued_dttm: datetime, + map_index: int, ): return str( generate_static_uuid( instant=logical_date, - data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}".encode(), + data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}.{queued_dttm.isoformat()}.{map_index}".encode(), ) ) diff --git a/providers/src/airflow/providers/openlineage/plugins/listener.py b/providers/src/airflow/providers/openlineage/plugins/listener.py index 7feece437a620..aa2ce98a71090 100644 --- a/providers/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/src/airflow/providers/openlineage/plugins/listener.py @@ -143,11 +143,14 @@ def on_running(): logical_date = task_instance.logical_date else: logical_date = task_instance.execution_date + task_uuid = self.adapter.build_task_instance_run_id( dag_id=dag.dag_id, task_id=task.task_id, try_number=task_instance.try_number, logical_date=logical_date, + queued_dttm=task_instance.queued_dttm if task_instance.queued_dttm else logical_date, + map_index=task_instance.map_index, ) event_type = RunState.RUNNING.value.lower() operator_name = task.task_type.lower() @@ -231,6 +234,8 @@ def on_success(): task_id=task.task_id, try_number=_get_try_number_success(task_instance), logical_date=logical_date, + queued_dttm=task_instance.queued_dttm if task_instance.queued_dttm else logical_date, + map_index=task_instance.map_index, ) event_type = RunState.COMPLETE.value.lower() operator_name = task.task_type.lower() @@ -329,11 +334,14 @@ def on_failure(): logical_date = task_instance.logical_date else: logical_date = task_instance.execution_date + task_uuid = self.adapter.build_task_instance_run_id( dag_id=dag.dag_id, task_id=task.task_id, try_number=task_instance.try_number, logical_date=logical_date, + queued_dttm=task_instance.queued_dttm if task_instance.queued_dttm else logical_date, + map_index=task_instance.map_index, ) event_type = RunState.FAIL.value.lower() operator_name = task.task_type.lower() diff --git a/providers/src/airflow/providers/openlineage/plugins/macros.py b/providers/src/airflow/providers/openlineage/plugins/macros.py index 69af17321e4bd..e8bbc2c4c7f91 100644 --- a/providers/src/airflow/providers/openlineage/plugins/macros.py +++ b/providers/src/airflow/providers/openlineage/plugins/macros.py @@ -67,6 +67,8 @@ def lineage_run_id(task_instance: TaskInstance): task_id=task_instance.task_id, try_number=task_instance.try_number, logical_date=logical_date, + queued_dttm=task_instance.queued_dttm, + map_index=task_instance.map_index, ) diff --git a/providers/src/airflow/providers/openlineage/utils/utils.py b/providers/src/airflow/providers/openlineage/utils/utils.py index 99faa3c4d5cec..faf14bdbb7b3b 100644 --- a/providers/src/airflow/providers/openlineage/utils/utils.py +++ b/providers/src/airflow/providers/openlineage/utils/utils.py @@ -660,11 +660,11 @@ def decorator(f): def wrapper(*args, **kwargs): try: return f(*args, **kwargs) - except Exception as e: + except Exception: log.warning( - "Note: exception below is being caught: it's printed for visibility. However OpenLineage events aren't being emitted. If you see that, task has completed successfully despite not getting OL events." + "OpenLineage event emission failed. Exception below is being caught: it's printed for visibility. This has no impact on actual task execution status.", + exc_info=True, ) - log.warning(e) return wrapper diff --git a/providers/tests/openlineage/plugins/test_adapter.py b/providers/tests/openlineage/plugins/test_adapter.py index 576f6df830c31..44261fe6e0e23 100644 --- a/providers/tests/openlineage/plugins/test_adapter.py +++ b/providers/tests/openlineage/plugins/test_adapter.py @@ -900,6 +900,8 @@ def test_build_task_instance_run_id_is_valid_uuid(): task_id="task_id", try_number=1, logical_date=datetime.datetime.now(), + queued_dttm=datetime.datetime.now(), + map_index=-1, ) uuid_result = uuid.UUID(result) assert uuid_result @@ -912,28 +914,56 @@ def test_build_task_instance_run_id_same_input_gives_same_result(): task_id="task1", try_number=1, logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1), + queued_dttm=datetime.datetime(2024, 1, 1, 1, 1, 1), + map_index=-1, ) result2 = OpenLineageAdapter.build_task_instance_run_id( dag_id="dag1", task_id="task1", try_number=1, logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1), + queued_dttm=datetime.datetime(2024, 1, 1, 1, 1, 1), + map_index=-1, ) assert result1 == result2 +def test_build_task_instance_run_id_different_map_index_gives_different_result(): + result1 = OpenLineageAdapter.build_task_instance_run_id( + dag_id="dag1", + task_id="task1", + try_number=1, + logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1), + queued_dttm=datetime.datetime(2024, 1, 1, 1, 1, 1), + map_index=1, + ) + result2 = OpenLineageAdapter.build_task_instance_run_id( + dag_id="dag1", + task_id="task1", + try_number=1, + logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1), + queued_dttm=datetime.datetime(2024, 1, 1, 1, 1, 1), + map_index=2, + ) + assert result1 != result2 + + def test_build_task_instance_run_id_different_inputs_gives_different_results(): result1 = OpenLineageAdapter.build_task_instance_run_id( dag_id="dag1", task_id="task1", try_number=1, logical_date=datetime.datetime.now(), + queued_dttm=datetime.datetime.now(), + map_index=-1, ) result2 = OpenLineageAdapter.build_task_instance_run_id( dag_id="dag2", task_id="task2", try_number=2, logical_date=datetime.datetime.now(), + queued_dttm=datetime.datetime.now(), + map_index=-1, ) assert result1 != result2 diff --git a/providers/tests/openlineage/plugins/test_execution.py b/providers/tests/openlineage/plugins/test_execution.py index 581054812127d..26aa09078cba8 100644 --- a/providers/tests/openlineage/plugins/test_execution.py +++ b/providers/tests/openlineage/plugins/test_execution.py @@ -63,7 +63,6 @@ def get_sorted_events(event_dir: str) -> list[str]: def has_value_in_events(events, chain, value): x = [get_from_nullable_chain(event, chain) for event in events] - log.error(x) y = [z == value for z in x] return any(y) diff --git a/providers/tests/openlineage/plugins/test_listener.py b/providers/tests/openlineage/plugins/test_listener.py index d53c23c8b6dea..355da0092e232 100644 --- a/providers/tests/openlineage/plugins/test_listener.py +++ b/providers/tests/openlineage/plugins/test_listener.py @@ -189,13 +189,12 @@ def _create_listener_and_task_instance() -> tuple[OpenLineageListener, TaskInsta """ def mock_dag_id(dag_id, logical_date): - return f"{logical_date}.{dag_id}" + return f"{logical_date.isoformat()}.{dag_id}" - def mock_task_id(dag_id, task_id, try_number, logical_date): - return f"{logical_date}.{dag_id}.{task_id}.{try_number}" + def mock_task_id(dag_id, task_id, try_number, logical_date, queued_dttm, map_index): + return f"{logical_date.isoformat()}.{dag_id}.{task_id}.{try_number}.{queued_dttm.isoformat()}.{map_index}" listener = OpenLineageListener() - listener.log = mock.Mock() listener.extractor_manager = mock.Mock() metadata = mock.Mock() @@ -216,22 +215,26 @@ def mock_task_id(dag_id, task_id, try_number, logical_date): task_instance.dag_run.data_interval_start = None task_instance.dag_run.data_interval_end = None if AIRFLOW_V_3_0_PLUS: - task_instance.dag_run.logical_date = "2020-01-01T01:01:01" + task_instance.dag_run.logical_date = dt.datetime(2020, 1, 1, 1, 1, 1) else: - task_instance.dag_run.execution_date = "2020-01-01T01:01:01" + task_instance.dag_run.execution_date = dt.datetime(2020, 1, 1, 1, 1, 1) task_instance.task = mock.Mock() task_instance.task.task_id = "task_id" task_instance.task.dag = mock.Mock() task_instance.task.dag.dag_id = "dag_id" task_instance.task.dag.description = "Test DAG Description" task_instance.task.dag.owner = "Test Owner" + task_instance.task.inlets = [] + task_instance.task.outlets = [] task_instance.dag_id = "dag_id" task_instance.run_id = "dag_run_run_id" task_instance.try_number = 1 task_instance.state = State.RUNNING task_instance.start_date = dt.datetime(2023, 1, 1, 13, 1, 1) task_instance.end_date = dt.datetime(2023, 1, 3, 13, 1, 1) - task_instance.logical_date = "2020-01-01T01:01:01" + task_instance.logical_date = dt.datetime(2020, 1, 1, 1, 1, 1) + task_instance.queued_dttm = dt.datetime(2023, 1, 3, 20, 20, 1) + task_instance.map_index = -1 task_instance.next_method = None # Ensure this is None to reach start_task return listener, task_instance @@ -258,8 +261,8 @@ def test_adapter_start_task_is_called_with_proper_arguments( correctly passed to the adapter. It also verifies that custom facets and Airflow run facets are correctly retrieved and included in the call. This ensures that all relevant data, including custom and Airflow-specific metadata, is accurately conveyed to the adapter during the initialization of a task, - reflecting the comprehensive tracking of task execution contexts. - """ + reflecting the comprehensive tracking of task execution contexts.""" + listener, task_instance = _create_listener_and_task_instance() mock_get_job_name.return_value = "job_name" mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} @@ -269,7 +272,7 @@ def test_adapter_start_task_is_called_with_proper_arguments( listener.on_task_instance_running(None, task_instance, None) listener.adapter.start_task.assert_called_once_with( - run_id="2020-01-01T01:01:01.dag_id.task_id.1", + run_id="2020-01-01T01:01:01.dag_id.task_id.1.2023-01-03T20:20:01.-1", job_name="job_name", job_description="Test DAG Description", event_time="2023-01-01T13:01:01", @@ -291,7 +294,6 @@ def test_adapter_start_task_is_called_with_proper_arguments( @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") -@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @@ -300,7 +302,6 @@ def test_adapter_fail_task_is_called_with_proper_arguments( mock_get_job_name, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, - mocked_adapter, mock_disabled, mock_debug_mode, ): @@ -312,17 +313,9 @@ def test_adapter_fail_task_is_called_with_proper_arguments( failure events, thus confirming that the adapter's failure handling is functioning as expected. """ - def mock_dag_id(dag_id, logical_date): - return f"{logical_date}.{dag_id}" - - def mock_task_id(dag_id, task_id, try_number, logical_date): - return f"{logical_date}.{dag_id}.{task_id}.{try_number}" - listener, task_instance = _create_listener_and_task_instance() - task_instance.logical_date = "2020-01-01T01:01:01" + task_instance.logical_date = dt.datetime(2020, 1, 1, 1, 1, 1) mock_get_job_name.return_value = "job_name" - mocked_adapter.build_dag_run_id.side_effect = mock_dag_id - mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} mock_disabled.return_value = False @@ -339,7 +332,7 @@ def mock_task_id(dag_id, task_id, try_number, logical_date): job_name="job_name", parent_job_name="dag_id", parent_run_id="2020-01-01T01:01:01.dag_id", - run_id="2020-01-01T01:01:01.dag_id.task_id.1", + run_id="2020-01-01T01:01:01.dag_id.task_id.1.2023-01-03T20:20:01.-1", task=listener.extractor_manager.extract_metadata(), run_facets={ "custom_user_facet": 2, @@ -372,16 +365,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments( during the task's lifecycle events. """ - def mock_dag_id(dag_id, logical_date): - return f"{logical_date}.{dag_id}" - - def mock_task_id(dag_id, task_id, try_number, logical_date): - return f"{logical_date}.{dag_id}.{task_id}.{try_number}" - listener, task_instance = _create_listener_and_task_instance() mock_get_job_name.return_value = "job_name" - listener.adapter.build_dag_run_id.side_effect = mock_dag_id - listener.adapter.build_task_instance_run_id.side_effect = mock_task_id mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} mock_disabled.return_value = False @@ -396,7 +381,7 @@ def mock_task_id(dag_id, task_id, try_number, logical_date): job_name="job_name", parent_job_name="dag_id", parent_run_id="2020-01-01T01:01:01.dag_id", - run_id=f"2020-01-01T01:01:01.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}", + run_id=f"2020-01-01T01:01:01.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}.2023-01-03T20:20:01.-1", task=listener.extractor_manager.extract_metadata(), run_facets={ "custom_user_facet": 2, @@ -419,8 +404,10 @@ def test_on_task_instance_running_correctly_calls_openlineage_adapter_run_id_met listener.adapter.build_task_instance_run_id.assert_called_once_with( dag_id="dag_id", task_id="task_id", - logical_date="2020-01-01T01:01:01", + logical_date=dt.datetime(2020, 1, 1, 1, 1, 1), try_number=1, + queued_dttm=dt.datetime(2023, 1, 3, 20, 20, 1), + map_index=-1, ) @@ -441,8 +428,10 @@ def test_on_task_instance_failed_correctly_calls_openlineage_adapter_run_id_meth listener.adapter.build_task_instance_run_id.assert_called_once_with( dag_id="dag_id", task_id="task_id", - logical_date="2020-01-01T01:01:01", + logical_date=dt.datetime(2020, 1, 1, 1, 1, 1), try_number=1, + queued_dttm=dt.datetime(2023, 1, 3, 20, 20, 1), + map_index=-1, ) @@ -459,8 +448,10 @@ def test_on_task_instance_success_correctly_calls_openlineage_adapter_run_id_met listener.adapter.build_task_instance_run_id.assert_called_once_with( dag_id="dag_id", task_id="task_id", - logical_date="2020-01-01T01:01:01", + logical_date=dt.datetime(2020, 1, 1, 1, 1, 1), try_number=EXPECTED_TRY_NUMBER_1, + queued_dttm=dt.datetime(2023, 1, 3, 20, 20, 1), + map_index=-1, ) @@ -701,9 +692,12 @@ def simple_callable(**kwargs): run_id=run_id, **triggered_by_kwargs, ) # type: ignore - self.task_instance_1 = TaskInstance(self.task_1, run_id=run_id) - self.task_instance_2 = TaskInstance(self.task_2, run_id=run_id) + self.task_instance_1 = TaskInstance(self.task_1, run_id=run_id, map_index=-1) + self.task_instance_2 = TaskInstance(self.task_2, run_id=run_id, map_index=-1) self.task_instance_1.dag_run = self.task_instance_2.dag_run = self.dagrun + self.task_instance_2.queued_dttm = self.task_instance_1.queued_dttm = dt.datetime( + 2023, 1, 1, 1, 10, 10 + ) @pytest.mark.parametrize( "selective_enable, enable_dag, expected_call_count",