Skip to content

Commit

Permalink
utilize more information to deterministically generate OpenLineage ru…
Browse files Browse the repository at this point in the history
…n_id

Signed-off-by: Maciej Obuchowski <[email protected]>
  • Loading branch information
mobuchowski committed Nov 19, 2024
1 parent 7a2fae0 commit 9cb56cc
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ async def get_artifacts_for_steps(steps, artifacts):
task_id=operator.task_id,
logical_date=_get_logical_date(task_instance),
try_number=_get_try_number(task_instance),
queued_dttm=task_instance.queued_dttm,
map_index=task_instance.map_index,
)

parent_job = ParentRunMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,13 @@ def build_task_instance_run_id(
task_id: str,
try_number: int,
logical_date: datetime,
queued_dttm: datetime,
map_index: int,
):
return str(
generate_static_uuid(
instant=logical_date,
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}".encode(),
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}.{queued_dttm.isoformat()}.{map_index}".encode(),
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,14 @@ def on_running():
logical_date = task_instance.logical_date
else:
logical_date = task_instance.execution_date

task_uuid = self.adapter.build_task_instance_run_id(
dag_id=dag.dag_id,
task_id=task.task_id,
try_number=task_instance.try_number,
logical_date=logical_date,
queued_dttm=task_instance.queued_dttm if task_instance.queued_dttm else logical_date,
map_index=task_instance.map_index,
)
event_type = RunState.RUNNING.value.lower()
operator_name = task.task_type.lower()
Expand Down Expand Up @@ -231,6 +234,8 @@ def on_success():
task_id=task.task_id,
try_number=_get_try_number_success(task_instance),
logical_date=logical_date,
queued_dttm=task_instance.queued_dttm if task_instance.queued_dttm else logical_date,
map_index=task_instance.map_index,
)
event_type = RunState.COMPLETE.value.lower()
operator_name = task.task_type.lower()
Expand Down Expand Up @@ -329,11 +334,14 @@ def on_failure():
logical_date = task_instance.logical_date
else:
logical_date = task_instance.execution_date

task_uuid = self.adapter.build_task_instance_run_id(
dag_id=dag.dag_id,
task_id=task.task_id,
try_number=task_instance.try_number,
logical_date=logical_date,
queued_dttm=task_instance.queued_dttm if task_instance.queued_dttm else logical_date,
map_index=task_instance.map_index,
)
event_type = RunState.FAIL.value.lower()
operator_name = task.task_type.lower()
Expand Down
2 changes: 2 additions & 0 deletions providers/src/airflow/providers/openlineage/plugins/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def lineage_run_id(task_instance: TaskInstance):
task_id=task_instance.task_id,
try_number=task_instance.try_number,
logical_date=logical_date,
queued_dttm=task_instance.queued_dttm,
map_index=task_instance.map_index,
)


Expand Down
6 changes: 3 additions & 3 deletions providers/src/airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,11 +660,11 @@ def decorator(f):
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
except Exception:
log.warning(
"Note: exception below is being caught: it's printed for visibility. However OpenLineage events aren't being emitted. If you see that, task has completed successfully despite not getting OL events."
"OpenLineage event emission failed. Exception below is being caught: it's printed for visibility. This has no impact on actual task execution status.",
exc_info=True,
)
log.warning(e)

return wrapper

Expand Down
30 changes: 30 additions & 0 deletions providers/tests/openlineage/plugins/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,8 @@ def test_build_task_instance_run_id_is_valid_uuid():
task_id="task_id",
try_number=1,
logical_date=datetime.datetime.now(),
queued_dttm=datetime.datetime.now(),
map_index=-1,
)
uuid_result = uuid.UUID(result)
assert uuid_result
Expand All @@ -912,28 +914,56 @@ def test_build_task_instance_run_id_same_input_gives_same_result():
task_id="task1",
try_number=1,
logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
queued_dttm=datetime.datetime(2024, 1, 1, 1, 1, 1),
map_index=-1,
)
result2 = OpenLineageAdapter.build_task_instance_run_id(
dag_id="dag1",
task_id="task1",
try_number=1,
logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
queued_dttm=datetime.datetime(2024, 1, 1, 1, 1, 1),
map_index=-1,
)
assert result1 == result2


def test_build_task_instance_run_id_different_map_index_gives_different_result():
result1 = OpenLineageAdapter.build_task_instance_run_id(
dag_id="dag1",
task_id="task1",
try_number=1,
logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
queued_dttm=datetime.datetime(2024, 1, 1, 1, 1, 1),
map_index=1,
)
result2 = OpenLineageAdapter.build_task_instance_run_id(
dag_id="dag1",
task_id="task1",
try_number=1,
logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
queued_dttm=datetime.datetime(2024, 1, 1, 1, 1, 1),
map_index=2,
)
assert result1 != result2


def test_build_task_instance_run_id_different_inputs_gives_different_results():
result1 = OpenLineageAdapter.build_task_instance_run_id(
dag_id="dag1",
task_id="task1",
try_number=1,
logical_date=datetime.datetime.now(),
queued_dttm=datetime.datetime.now(),
map_index=-1,
)
result2 = OpenLineageAdapter.build_task_instance_run_id(
dag_id="dag2",
task_id="task2",
try_number=2,
logical_date=datetime.datetime.now(),
queued_dttm=datetime.datetime.now(),
map_index=-1,
)
assert result1 != result2

Expand Down
1 change: 0 additions & 1 deletion providers/tests/openlineage/plugins/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def get_sorted_events(event_dir: str) -> list[str]:

def has_value_in_events(events, chain, value):
x = [get_from_nullable_chain(event, chain) for event in events]
log.error(x)
y = [z == value for z in x]
return any(y)

Expand Down
66 changes: 30 additions & 36 deletions providers/tests/openlineage/plugins/test_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,12 @@ def _create_listener_and_task_instance() -> tuple[OpenLineageListener, TaskInsta
"""

def mock_dag_id(dag_id, logical_date):
return f"{logical_date}.{dag_id}"
return f"{logical_date.isoformat()}.{dag_id}"

def mock_task_id(dag_id, task_id, try_number, logical_date):
return f"{logical_date}.{dag_id}.{task_id}.{try_number}"
def mock_task_id(dag_id, task_id, try_number, logical_date, queued_dttm, map_index):
return f"{logical_date.isoformat()}.{dag_id}.{task_id}.{try_number}.{queued_dttm.isoformat()}.{map_index}"

listener = OpenLineageListener()
listener.log = mock.Mock()
listener.extractor_manager = mock.Mock()

metadata = mock.Mock()
Expand All @@ -216,22 +215,26 @@ def mock_task_id(dag_id, task_id, try_number, logical_date):
task_instance.dag_run.data_interval_start = None
task_instance.dag_run.data_interval_end = None
if AIRFLOW_V_3_0_PLUS:
task_instance.dag_run.logical_date = "2020-01-01T01:01:01"
task_instance.dag_run.logical_date = dt.datetime(2020, 1, 1, 1, 1, 1)
else:
task_instance.dag_run.execution_date = "2020-01-01T01:01:01"
task_instance.dag_run.execution_date = dt.datetime(2020, 1, 1, 1, 1, 1)
task_instance.task = mock.Mock()
task_instance.task.task_id = "task_id"
task_instance.task.dag = mock.Mock()
task_instance.task.dag.dag_id = "dag_id"
task_instance.task.dag.description = "Test DAG Description"
task_instance.task.dag.owner = "Test Owner"
task_instance.task.inlets = []
task_instance.task.outlets = []
task_instance.dag_id = "dag_id"
task_instance.run_id = "dag_run_run_id"
task_instance.try_number = 1
task_instance.state = State.RUNNING
task_instance.start_date = dt.datetime(2023, 1, 1, 13, 1, 1)
task_instance.end_date = dt.datetime(2023, 1, 3, 13, 1, 1)
task_instance.logical_date = "2020-01-01T01:01:01"
task_instance.logical_date = dt.datetime(2020, 1, 1, 1, 1, 1)
task_instance.queued_dttm = dt.datetime(2023, 1, 3, 20, 20, 1)
task_instance.map_index = -1
task_instance.next_method = None # Ensure this is None to reach start_task

return listener, task_instance
Expand All @@ -258,8 +261,8 @@ def test_adapter_start_task_is_called_with_proper_arguments(
correctly passed to the adapter. It also verifies that custom facets and Airflow run facets are
correctly retrieved and included in the call. This ensures that all relevant data, including custom
and Airflow-specific metadata, is accurately conveyed to the adapter during the initialization of a task,
reflecting the comprehensive tracking of task execution contexts.
"""
reflecting the comprehensive tracking of task execution contexts."""

listener, task_instance = _create_listener_and_task_instance()
mock_get_job_name.return_value = "job_name"
mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
Expand All @@ -269,7 +272,7 @@ def test_adapter_start_task_is_called_with_proper_arguments(

listener.on_task_instance_running(None, task_instance, None)
listener.adapter.start_task.assert_called_once_with(
run_id="2020-01-01T01:01:01.dag_id.task_id.1",
run_id="2020-01-01T01:01:01.dag_id.task_id.1.2023-01-03T20:20:01.-1",
job_name="job_name",
job_description="Test DAG Description",
event_time="2023-01-01T13:01:01",
Expand All @@ -291,7 +294,6 @@ def test_adapter_start_task_is_called_with_proper_arguments(

@mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True)
@mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled")
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter")
@mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet")
@mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets")
@mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name")
Expand All @@ -300,7 +302,6 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
mock_get_job_name,
mock_get_user_provided_run_facets,
mock_get_airflow_run_facet,
mocked_adapter,
mock_disabled,
mock_debug_mode,
):
Expand All @@ -312,17 +313,9 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
failure events, thus confirming that the adapter's failure handling is functioning as expected.
"""

def mock_dag_id(dag_id, logical_date):
return f"{logical_date}.{dag_id}"

def mock_task_id(dag_id, task_id, try_number, logical_date):
return f"{logical_date}.{dag_id}.{task_id}.{try_number}"

listener, task_instance = _create_listener_and_task_instance()
task_instance.logical_date = "2020-01-01T01:01:01"
task_instance.logical_date = dt.datetime(2020, 1, 1, 1, 1, 1)
mock_get_job_name.return_value = "job_name"
mocked_adapter.build_dag_run_id.side_effect = mock_dag_id
mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}}
mock_disabled.return_value = False
Expand All @@ -339,7 +332,7 @@ def mock_task_id(dag_id, task_id, try_number, logical_date):
job_name="job_name",
parent_job_name="dag_id",
parent_run_id="2020-01-01T01:01:01.dag_id",
run_id="2020-01-01T01:01:01.dag_id.task_id.1",
run_id="2020-01-01T01:01:01.dag_id.task_id.1.2023-01-03T20:20:01.-1",
task=listener.extractor_manager.extract_metadata(),
run_facets={
"custom_user_facet": 2,
Expand Down Expand Up @@ -372,16 +365,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments(
during the task's lifecycle events.
"""

def mock_dag_id(dag_id, logical_date):
return f"{logical_date}.{dag_id}"

def mock_task_id(dag_id, task_id, try_number, logical_date):
return f"{logical_date}.{dag_id}.{task_id}.{try_number}"

listener, task_instance = _create_listener_and_task_instance()
mock_get_job_name.return_value = "job_name"
listener.adapter.build_dag_run_id.side_effect = mock_dag_id
listener.adapter.build_task_instance_run_id.side_effect = mock_task_id
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}}
mock_disabled.return_value = False
Expand All @@ -396,7 +381,7 @@ def mock_task_id(dag_id, task_id, try_number, logical_date):
job_name="job_name",
parent_job_name="dag_id",
parent_run_id="2020-01-01T01:01:01.dag_id",
run_id=f"2020-01-01T01:01:01.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}",
run_id=f"2020-01-01T01:01:01.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}.2023-01-03T20:20:01.-1",
task=listener.extractor_manager.extract_metadata(),
run_facets={
"custom_user_facet": 2,
Expand All @@ -419,8 +404,10 @@ def test_on_task_instance_running_correctly_calls_openlineage_adapter_run_id_met
listener.adapter.build_task_instance_run_id.assert_called_once_with(
dag_id="dag_id",
task_id="task_id",
logical_date="2020-01-01T01:01:01",
logical_date=dt.datetime(2020, 1, 1, 1, 1, 1),
try_number=1,
queued_dttm=dt.datetime(2023, 1, 3, 20, 20, 1),
map_index=-1,
)


Expand All @@ -441,8 +428,10 @@ def test_on_task_instance_failed_correctly_calls_openlineage_adapter_run_id_meth
listener.adapter.build_task_instance_run_id.assert_called_once_with(
dag_id="dag_id",
task_id="task_id",
logical_date="2020-01-01T01:01:01",
logical_date=dt.datetime(2020, 1, 1, 1, 1, 1),
try_number=1,
queued_dttm=dt.datetime(2023, 1, 3, 20, 20, 1),
map_index=-1,
)


Expand All @@ -459,8 +448,10 @@ def test_on_task_instance_success_correctly_calls_openlineage_adapter_run_id_met
listener.adapter.build_task_instance_run_id.assert_called_once_with(
dag_id="dag_id",
task_id="task_id",
logical_date="2020-01-01T01:01:01",
logical_date=dt.datetime(2020, 1, 1, 1, 1, 1),
try_number=EXPECTED_TRY_NUMBER_1,
queued_dttm=dt.datetime(2023, 1, 3, 20, 20, 1),
map_index=-1,
)


Expand Down Expand Up @@ -701,9 +692,12 @@ def simple_callable(**kwargs):
run_id=run_id,
**triggered_by_kwargs,
) # type: ignore
self.task_instance_1 = TaskInstance(self.task_1, run_id=run_id)
self.task_instance_2 = TaskInstance(self.task_2, run_id=run_id)
self.task_instance_1 = TaskInstance(self.task_1, run_id=run_id, map_index=-1)
self.task_instance_2 = TaskInstance(self.task_2, run_id=run_id, map_index=-1)
self.task_instance_1.dag_run = self.task_instance_2.dag_run = self.dagrun
self.task_instance_2.queued_dttm = self.task_instance_1.queued_dttm = dt.datetime(
2023, 1, 1, 1, 10, 10
)

@pytest.mark.parametrize(
"selective_enable, enable_dag, expected_call_count",
Expand Down

0 comments on commit 9cb56cc

Please sign in to comment.