diff --git a/providers/src/airflow/providers/dbt/cloud/utils/openlineage.py b/providers/src/airflow/providers/dbt/cloud/utils/openlineage.py index 41b1f9635fd1c..d7a9ad700b83d 100644 --- a/providers/src/airflow/providers/dbt/cloud/utils/openlineage.py +++ b/providers/src/airflow/providers/dbt/cloud/utils/openlineage.py @@ -156,6 +156,8 @@ async def get_artifacts_for_steps(steps, artifacts): task_id=operator.task_id, logical_date=_get_logical_date(task_instance), try_number=_get_try_number(task_instance), + queued_dttm=task_instance.queued_dttm, + map_index=task_instance.map_index, ) parent_job = ParentRunMetadata( diff --git a/providers/src/airflow/providers/openlineage/plugins/adapter.py b/providers/src/airflow/providers/openlineage/plugins/adapter.py index 688f2d65a547c..535618529c135 100644 --- a/providers/src/airflow/providers/openlineage/plugins/adapter.py +++ b/providers/src/airflow/providers/openlineage/plugins/adapter.py @@ -129,11 +129,12 @@ def build_task_instance_run_id( task_id: str, try_number: int, logical_date: datetime, + map_index: int, ): return str( generate_static_uuid( instant=logical_date, - data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}".encode(), + data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}.{map_index}".encode(), ) ) diff --git a/providers/src/airflow/providers/openlineage/plugins/listener.py b/providers/src/airflow/providers/openlineage/plugins/listener.py index 7feece437a620..6a539ea27f4ce 100644 --- a/providers/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/src/airflow/providers/openlineage/plugins/listener.py @@ -42,6 +42,7 @@ get_user_provided_run_facets, is_operator_disabled, is_selective_lineage_enabled, + is_ti_rescheduled_already, print_warning, ) from airflow.settings import configure_orm @@ -134,6 +135,11 @@ def on_running(): # we return here because Airflow 2.3 needs task from deferred state if task_instance.next_method is not None: return + + if is_ti_rescheduled_already(task_instance): + self.log.debug("Skipping this instance of rescheduled task - START event was emitted already") + return + parent_run_id = self.adapter.build_dag_run_id( dag_id=dag.dag_id, logical_date=dagrun.logical_date, @@ -143,11 +149,13 @@ def on_running(): logical_date = task_instance.logical_date else: logical_date = task_instance.execution_date + task_uuid = self.adapter.build_task_instance_run_id( dag_id=dag.dag_id, task_id=task.task_id, try_number=task_instance.try_number, logical_date=logical_date, + map_index=task_instance.map_index, ) event_type = RunState.RUNNING.value.lower() operator_name = task.task_type.lower() @@ -231,6 +239,7 @@ def on_success(): task_id=task.task_id, try_number=_get_try_number_success(task_instance), logical_date=logical_date, + map_index=task_instance.map_index, ) event_type = RunState.COMPLETE.value.lower() operator_name = task.task_type.lower() @@ -329,11 +338,13 @@ def on_failure(): logical_date = task_instance.logical_date else: logical_date = task_instance.execution_date + task_uuid = self.adapter.build_task_instance_run_id( dag_id=dag.dag_id, task_id=task.task_id, try_number=task_instance.try_number, logical_date=logical_date, + map_index=task_instance.map_index, ) event_type = RunState.FAIL.value.lower() operator_name = task.task_type.lower() diff --git a/providers/src/airflow/providers/openlineage/plugins/macros.py b/providers/src/airflow/providers/openlineage/plugins/macros.py index 69af17321e4bd..fd5194d3f3207 100644 --- a/providers/src/airflow/providers/openlineage/plugins/macros.py +++ b/providers/src/airflow/providers/openlineage/plugins/macros.py @@ -67,6 +67,7 @@ def lineage_run_id(task_instance: TaskInstance): task_id=task_instance.task_id, try_number=task_instance.try_number, logical_date=logical_date, + map_index=task_instance.map_index, ) diff --git a/providers/src/airflow/providers/openlineage/utils/utils.py b/providers/src/airflow/providers/openlineage/utils/utils.py index 622f45bd65b29..eb1d5984d327d 100644 --- a/providers/src/airflow/providers/openlineage/utils/utils.py +++ b/providers/src/airflow/providers/openlineage/utils/utils.py @@ -30,6 +30,7 @@ from deprecated import deprecated from openlineage.client.utils import RedactMixin from packaging.version import Version +from sqlalchemy import exists from airflow import __version__ as AIRFLOW_VERSION from airflow.exceptions import ( @@ -37,7 +38,7 @@ ) # TODO: move this maybe to Airflow's logic? -from airflow.models import DAG, BaseOperator, DagRun, MappedOperator +from airflow.models import DAG, BaseOperator, DagRun, MappedOperator, TaskReschedule from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf from airflow.providers.openlineage.plugins.facets import ( AirflowDagRunFacet, @@ -53,6 +54,7 @@ is_dag_lineage_enabled, is_task_lineage_enabled, ) +from airflow.sensors.base import BaseSensorOperator from airflow.serialization.serialized_objects import SerializedBaseOperator from airflow.utils.context import AirflowContextDeprecationWarning from airflow.utils.log.secrets_masker import ( @@ -62,6 +64,7 @@ should_hide_value_for_key, ) from airflow.utils.module_loading import import_string +from airflow.utils.session import NEW_SESSION, provide_session if TYPE_CHECKING: from openlineage.client.event_v2 import Dataset as OpenLineageDataset @@ -184,6 +187,28 @@ def is_selective_lineage_enabled(obj: DAG | BaseOperator | MappedOperator) -> bo raise TypeError("is_selective_lineage_enabled can only be used on DAG or Operator objects") +@provide_session +def is_ti_rescheduled_already(ti: TaskInstance, session=NEW_SESSION): + if not isinstance(ti.task, BaseSensorOperator): + return False + + if not ti.task.reschedule: + return False + + return ( + session.query( + exists().where( + TaskReschedule.dag_id == ti.dag_id, + TaskReschedule.task_id == ti.task_id, + TaskReschedule.run_id == ti.run_id, + TaskReschedule.map_index == ti.map_index, + TaskReschedule.try_number == ti.try_number, + ) + ).scalar() + is True + ) + + class InfoJsonEncodable(dict): """ Airflow objects might not be json-encodable overall. @@ -217,6 +242,7 @@ def __init__(self, obj): self, **{field: InfoJsonEncodable._cast_basic_types(getattr(self, field)) for field in self._fields}, ) + del self.obj @staticmethod def _cast_basic_types(value): @@ -677,11 +703,11 @@ def decorator(f): def wrapper(*args, **kwargs): try: return f(*args, **kwargs) - except Exception as e: + except Exception: log.warning( - "Note: exception below is being caught: it's printed for visibility. However OpenLineage events aren't being emitted. If you see that, task has completed successfully despite not getting OL events." + "OpenLineage event emission failed. Exception below is being caught: it's printed for visibility. This has no impact on actual task execution status.", + exc_info=True, ) - log.warning(e) return wrapper diff --git a/providers/tests/openlineage/plugins/test_adapter.py b/providers/tests/openlineage/plugins/test_adapter.py index 576f6df830c31..a9787a9399a3d 100644 --- a/providers/tests/openlineage/plugins/test_adapter.py +++ b/providers/tests/openlineage/plugins/test_adapter.py @@ -900,6 +900,7 @@ def test_build_task_instance_run_id_is_valid_uuid(): task_id="task_id", try_number=1, logical_date=datetime.datetime.now(), + map_index=-1, ) uuid_result = uuid.UUID(result) assert uuid_result @@ -912,28 +913,50 @@ def test_build_task_instance_run_id_same_input_gives_same_result(): task_id="task1", try_number=1, logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1), + map_index=-1, ) result2 = OpenLineageAdapter.build_task_instance_run_id( dag_id="dag1", task_id="task1", try_number=1, logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1), + map_index=-1, ) assert result1 == result2 +def test_build_task_instance_run_id_different_map_index_gives_different_result(): + result1 = OpenLineageAdapter.build_task_instance_run_id( + dag_id="dag1", + task_id="task1", + try_number=1, + logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1), + map_index=1, + ) + result2 = OpenLineageAdapter.build_task_instance_run_id( + dag_id="dag1", + task_id="task1", + try_number=1, + logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1), + map_index=2, + ) + assert result1 != result2 + + def test_build_task_instance_run_id_different_inputs_gives_different_results(): result1 = OpenLineageAdapter.build_task_instance_run_id( dag_id="dag1", task_id="task1", try_number=1, logical_date=datetime.datetime.now(), + map_index=-1, ) result2 = OpenLineageAdapter.build_task_instance_run_id( dag_id="dag2", task_id="task2", try_number=2, logical_date=datetime.datetime.now(), + map_index=-1, ) assert result1 != result2 diff --git a/providers/tests/openlineage/plugins/test_execution.py b/providers/tests/openlineage/plugins/test_execution.py index 581054812127d..26aa09078cba8 100644 --- a/providers/tests/openlineage/plugins/test_execution.py +++ b/providers/tests/openlineage/plugins/test_execution.py @@ -63,7 +63,6 @@ def get_sorted_events(event_dir: str) -> list[str]: def has_value_in_events(events, chain, value): x = [get_from_nullable_chain(event, chain) for event in events] - log.error(x) y = [z == value for z in x] return any(y) diff --git a/providers/tests/openlineage/plugins/test_listener.py b/providers/tests/openlineage/plugins/test_listener.py index d53c23c8b6dea..cbc4436ef0133 100644 --- a/providers/tests/openlineage/plugins/test_listener.py +++ b/providers/tests/openlineage/plugins/test_listener.py @@ -189,13 +189,12 @@ def _create_listener_and_task_instance() -> tuple[OpenLineageListener, TaskInsta """ def mock_dag_id(dag_id, logical_date): - return f"{logical_date}.{dag_id}" + return f"{logical_date.isoformat()}.{dag_id}" - def mock_task_id(dag_id, task_id, try_number, logical_date): - return f"{logical_date}.{dag_id}.{task_id}.{try_number}" + def mock_task_id(dag_id, task_id, try_number, logical_date, map_index): + return f"{logical_date.isoformat()}.{dag_id}.{task_id}.{try_number}.{map_index}" listener = OpenLineageListener() - listener.log = mock.Mock() listener.extractor_manager = mock.Mock() metadata = mock.Mock() @@ -216,22 +215,25 @@ def mock_task_id(dag_id, task_id, try_number, logical_date): task_instance.dag_run.data_interval_start = None task_instance.dag_run.data_interval_end = None if AIRFLOW_V_3_0_PLUS: - task_instance.dag_run.logical_date = "2020-01-01T01:01:01" + task_instance.dag_run.logical_date = dt.datetime(2020, 1, 1, 1, 1, 1) else: - task_instance.dag_run.execution_date = "2020-01-01T01:01:01" + task_instance.dag_run.execution_date = dt.datetime(2020, 1, 1, 1, 1, 1) task_instance.task = mock.Mock() task_instance.task.task_id = "task_id" task_instance.task.dag = mock.Mock() task_instance.task.dag.dag_id = "dag_id" task_instance.task.dag.description = "Test DAG Description" task_instance.task.dag.owner = "Test Owner" + task_instance.task.inlets = [] + task_instance.task.outlets = [] task_instance.dag_id = "dag_id" task_instance.run_id = "dag_run_run_id" task_instance.try_number = 1 task_instance.state = State.RUNNING task_instance.start_date = dt.datetime(2023, 1, 1, 13, 1, 1) task_instance.end_date = dt.datetime(2023, 1, 3, 13, 1, 1) - task_instance.logical_date = "2020-01-01T01:01:01" + task_instance.logical_date = dt.datetime(2020, 1, 1, 1, 1, 1) + task_instance.map_index = -1 task_instance.next_method = None # Ensure this is None to reach start_task return listener, task_instance @@ -258,8 +260,8 @@ def test_adapter_start_task_is_called_with_proper_arguments( correctly passed to the adapter. It also verifies that custom facets and Airflow run facets are correctly retrieved and included in the call. This ensures that all relevant data, including custom and Airflow-specific metadata, is accurately conveyed to the adapter during the initialization of a task, - reflecting the comprehensive tracking of task execution contexts. - """ + reflecting the comprehensive tracking of task execution contexts.""" + listener, task_instance = _create_listener_and_task_instance() mock_get_job_name.return_value = "job_name" mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} @@ -269,7 +271,7 @@ def test_adapter_start_task_is_called_with_proper_arguments( listener.on_task_instance_running(None, task_instance, None) listener.adapter.start_task.assert_called_once_with( - run_id="2020-01-01T01:01:01.dag_id.task_id.1", + run_id="2020-01-01T01:01:01.dag_id.task_id.1.-1", job_name="job_name", job_description="Test DAG Description", event_time="2023-01-01T13:01:01", @@ -291,7 +293,6 @@ def test_adapter_start_task_is_called_with_proper_arguments( @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") -@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @@ -300,7 +301,6 @@ def test_adapter_fail_task_is_called_with_proper_arguments( mock_get_job_name, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, - mocked_adapter, mock_disabled, mock_debug_mode, ): @@ -312,17 +312,9 @@ def test_adapter_fail_task_is_called_with_proper_arguments( failure events, thus confirming that the adapter's failure handling is functioning as expected. """ - def mock_dag_id(dag_id, logical_date): - return f"{logical_date}.{dag_id}" - - def mock_task_id(dag_id, task_id, try_number, logical_date): - return f"{logical_date}.{dag_id}.{task_id}.{try_number}" - listener, task_instance = _create_listener_and_task_instance() - task_instance.logical_date = "2020-01-01T01:01:01" + task_instance.logical_date = dt.datetime(2020, 1, 1, 1, 1, 1) mock_get_job_name.return_value = "job_name" - mocked_adapter.build_dag_run_id.side_effect = mock_dag_id - mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} mock_disabled.return_value = False @@ -339,7 +331,7 @@ def mock_task_id(dag_id, task_id, try_number, logical_date): job_name="job_name", parent_job_name="dag_id", parent_run_id="2020-01-01T01:01:01.dag_id", - run_id="2020-01-01T01:01:01.dag_id.task_id.1", + run_id="2020-01-01T01:01:01.dag_id.task_id.1.-1", task=listener.extractor_manager.extract_metadata(), run_facets={ "custom_user_facet": 2, @@ -372,16 +364,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments( during the task's lifecycle events. """ - def mock_dag_id(dag_id, logical_date): - return f"{logical_date}.{dag_id}" - - def mock_task_id(dag_id, task_id, try_number, logical_date): - return f"{logical_date}.{dag_id}.{task_id}.{try_number}" - listener, task_instance = _create_listener_and_task_instance() mock_get_job_name.return_value = "job_name" - listener.adapter.build_dag_run_id.side_effect = mock_dag_id - listener.adapter.build_task_instance_run_id.side_effect = mock_task_id mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} mock_disabled.return_value = False @@ -396,7 +380,7 @@ def mock_task_id(dag_id, task_id, try_number, logical_date): job_name="job_name", parent_job_name="dag_id", parent_run_id="2020-01-01T01:01:01.dag_id", - run_id=f"2020-01-01T01:01:01.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}", + run_id=f"2020-01-01T01:01:01.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}.-1", task=listener.extractor_manager.extract_metadata(), run_facets={ "custom_user_facet": 2, @@ -419,8 +403,9 @@ def test_on_task_instance_running_correctly_calls_openlineage_adapter_run_id_met listener.adapter.build_task_instance_run_id.assert_called_once_with( dag_id="dag_id", task_id="task_id", - logical_date="2020-01-01T01:01:01", + logical_date=dt.datetime(2020, 1, 1, 1, 1, 1), try_number=1, + map_index=-1, ) @@ -441,8 +426,9 @@ def test_on_task_instance_failed_correctly_calls_openlineage_adapter_run_id_meth listener.adapter.build_task_instance_run_id.assert_called_once_with( dag_id="dag_id", task_id="task_id", - logical_date="2020-01-01T01:01:01", + logical_date=dt.datetime(2020, 1, 1, 1, 1, 1), try_number=1, + map_index=-1, ) @@ -459,8 +445,9 @@ def test_on_task_instance_success_correctly_calls_openlineage_adapter_run_id_met listener.adapter.build_task_instance_run_id.assert_called_once_with( dag_id="dag_id", task_id="task_id", - logical_date="2020-01-01T01:01:01", + logical_date=dt.datetime(2020, 1, 1, 1, 1, 1), try_number=EXPECTED_TRY_NUMBER_1, + map_index=-1, ) @@ -701,8 +688,8 @@ def simple_callable(**kwargs): run_id=run_id, **triggered_by_kwargs, ) # type: ignore - self.task_instance_1 = TaskInstance(self.task_1, run_id=run_id) - self.task_instance_2 = TaskInstance(self.task_2, run_id=run_id) + self.task_instance_1 = TaskInstance(self.task_1, run_id=run_id, map_index=-1) + self.task_instance_2 = TaskInstance(self.task_2, run_id=run_id, map_index=-1) self.task_instance_1.dag_run = self.task_instance_2.dag_run = self.dagrun @pytest.mark.parametrize(