Skip to content

Commit

Permalink
move to dag_run.logical_date from execution date in OpenLineage provider
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <[email protected]>
  • Loading branch information
mobuchowski committed Sep 2, 2024
1 parent d59db11 commit d38e89c
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 31 deletions.
10 changes: 5 additions & 5 deletions airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ def _read_yaml_config(path: str) -> dict | None:
return yaml.safe_load(config_file)

@staticmethod
def build_dag_run_id(dag_id: str, execution_date: datetime) -> str:
def build_dag_run_id(dag_id: str, logical_date: datetime) -> str:
return str(
generate_static_uuid(
instant=execution_date,
instant=logical_date,
data=f"{conf.namespace()}.{dag_id}".encode(),
)
)
Expand Down Expand Up @@ -357,7 +357,7 @@ def dag_started(
run=self._build_run(
run_id=self.build_dag_run_id(
dag_id=dag_run.dag_id,
execution_date=dag_run.execution_date,
logical_date=dag_run.logical_date,
),
job_name=dag_run.dag_id,
nominal_start_time=nominal_start_time,
Expand All @@ -384,7 +384,7 @@ def dag_success(self, dag_run: DagRun, msg: str):
run=Run(
runId=self.build_dag_run_id(
dag_id=dag_run.dag_id,
execution_date=dag_run.execution_date,
logical_date=dag_run.logical_date,
),
facets={**get_airflow_state_run_facet(dag_run), **get_airflow_debug_facet()},
),
Expand All @@ -408,7 +408,7 @@ def dag_failed(self, dag_run: DagRun, msg: str):
run=Run(
runId=self.build_dag_run_id(
dag_id=dag_run.dag_id,
execution_date=dag_run.execution_date,
logical_date=dag_run.logical_date,
),
facets={
"errorMessage": error_message_run.ErrorMessageRunFacet(
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def on_running():
return
parent_run_id = self.adapter.build_dag_run_id(
dag_id=dag.dag_id,
execution_date=dagrun.execution_date,
logical_date=dagrun.logical_date,
)

task_uuid = self.adapter.build_task_instance_run_id(
Expand Down Expand Up @@ -213,7 +213,7 @@ def on_task_instance_success(
def on_success():
parent_run_id = OpenLineageAdapter.build_dag_run_id(
dag_id=dag.dag_id,
execution_date=dagrun.execution_date,
logical_date=dagrun.logical_date,
)

task_uuid = OpenLineageAdapter.build_task_instance_run_id(
Expand Down Expand Up @@ -312,7 +312,7 @@ def _on_task_instance_failed(
def on_failure():
parent_run_id = OpenLineageAdapter.build_dag_run_id(
dag_id=dag.dag_id,
execution_date=dagrun.execution_date,
logical_date=dagrun.logical_date,
)

task_uuid = OpenLineageAdapter.build_task_instance_run_id(
Expand Down
12 changes: 6 additions & 6 deletions tests/providers/openlineage/plugins/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,10 +826,10 @@ def test_openlineage_adapter_stats_emit_failed(

def test_build_dag_run_id_is_valid_uuid():
dag_id = "test_dag"
execution_date = datetime.datetime.now()
logical_date = datetime.datetime.now()
result = OpenLineageAdapter.build_dag_run_id(
dag_id=dag_id,
execution_date=execution_date,
logical_date=logical_date,
)
uuid_result = uuid.UUID(result)
assert uuid_result
Expand All @@ -839,23 +839,23 @@ def test_build_dag_run_id_is_valid_uuid():
def test_build_dag_run_id_same_input_give_same_result():
result1 = OpenLineageAdapter.build_dag_run_id(
dag_id="dag1",
execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
)
result2 = OpenLineageAdapter.build_dag_run_id(
dag_id="dag1",
execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
)
assert result1 == result2


def test_build_dag_run_id_different_inputs_give_different_results():
result1 = OpenLineageAdapter.build_dag_run_id(
dag_id="dag1",
execution_date=datetime.datetime.now(),
logical_date=datetime.datetime.now(),
)
result2 = OpenLineageAdapter.build_dag_run_id(
dag_id="dag2",
execution_date=datetime.datetime.now(),
logical_date=datetime.datetime.now(),
)
assert result1 != result2

Expand Down
34 changes: 17 additions & 17 deletions tests/providers/openlineage/plugins/test_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ def _create_listener_and_task_instance() -> tuple[OpenLineageListener, TaskInsta
# Now you can use listener and task_instance in your tests to simulate their interaction.
"""

def mock_dag_id(dag_id, execution_date):
return f"{execution_date}.{dag_id}"
def mock_dag_id(dag_id, logical_date):
return f"{logical_date}.{dag_id}"

def mock_task_id(dag_id, task_id, try_number, execution_date):
return f"{execution_date}.{dag_id}.{task_id}.{try_number}"
Expand All @@ -197,7 +197,7 @@ def mock_task_id(dag_id, task_id, try_number, execution_date):
task_instance.dag_run.run_id = "dag_run_run_id"
task_instance.dag_run.data_interval_start = None
task_instance.dag_run.data_interval_end = None
task_instance.dag_run.execution_date = "execution_date"
task_instance.dag_run.execution_date = "logical_date"
task_instance.task = mock.Mock()
task_instance.task.task_id = "task_id"
task_instance.task.dag = mock.Mock()
Expand All @@ -210,7 +210,7 @@ def mock_task_id(dag_id, task_id, try_number, execution_date):
task_instance.state = State.RUNNING
task_instance.start_date = dt.datetime(2023, 1, 1, 13, 1, 1)
task_instance.end_date = dt.datetime(2023, 1, 3, 13, 1, 1)
task_instance.execution_date = "execution_date"
task_instance.execution_date = "2020-01-01T01:01:01"
task_instance.next_method = None # Ensure this is None to reach start_task

return listener, task_instance
Expand Down Expand Up @@ -248,12 +248,12 @@ def test_adapter_start_task_is_called_with_proper_arguments(

listener.on_task_instance_running(None, task_instance, None)
listener.adapter.start_task.assert_called_once_with(
run_id="execution_date.dag_id.task_id.1",
run_id="2020-01-01T01:01:01.dag_id.task_id.1",
job_name="job_name",
job_description="Test DAG Description",
event_time="2023-01-01T13:01:01",
parent_job_name="dag_id",
parent_run_id="execution_date.dag_id",
parent_run_id="2020-01-01T01:01:01.dag_id",
code_location=None,
nominal_start_time=None,
nominal_end_time=None,
Expand Down Expand Up @@ -291,8 +291,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
failure events, thus confirming that the adapter's failure handling is functioning as expected.
"""

def mock_dag_id(dag_id, execution_date):
return f"{execution_date}.{dag_id}"
def mock_dag_id(dag_id, logical_date):
return f"{logical_date}.{dag_id}"

def mock_task_id(dag_id, task_id, try_number, execution_date):
return f"{execution_date}.{dag_id}.{task_id}.{try_number}"
Expand All @@ -316,8 +316,8 @@ def mock_task_id(dag_id, task_id, try_number, execution_date):
end_time="2023-01-03T13:01:01",
job_name="job_name",
parent_job_name="dag_id",
parent_run_id="execution_date.dag_id",
run_id="execution_date.dag_id.task_id.1",
parent_run_id="2020-01-01T01:01:01.dag_id",
run_id="2020-01-01T01:01:01.dag_id.task_id.1",
task=listener.extractor_manager.extract_metadata(),
run_facets={
"custom_user_facet": 2,
Expand Down Expand Up @@ -352,8 +352,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments(
during the task's lifecycle events.
"""

def mock_dag_id(dag_id, execution_date):
return f"{execution_date}.{dag_id}"
def mock_dag_id(dag_id, logical_date):
return f"{logical_date}.{dag_id}"

def mock_task_id(dag_id, task_id, try_number, execution_date):
return f"{execution_date}.{dag_id}.{task_id}.{try_number}"
Expand All @@ -375,8 +375,8 @@ def mock_task_id(dag_id, task_id, try_number, execution_date):
end_time="2023-01-03T13:01:01",
job_name="job_name",
parent_job_name="dag_id",
parent_run_id="execution_date.dag_id",
run_id=f"execution_date.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}",
parent_run_id="2020-01-01T01:01:01.dag_id",
run_id=f"2020-01-01T01:01:01.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}",
task=listener.extractor_manager.extract_metadata(),
run_facets={
"custom_user_facet": 2,
Expand All @@ -399,7 +399,7 @@ def test_on_task_instance_running_correctly_calls_openlineage_adapter_run_id_met
listener.adapter.build_task_instance_run_id.assert_called_once_with(
dag_id="dag_id",
task_id="task_id",
execution_date="execution_date",
execution_date="2020-01-01T01:01:01",
try_number=1,
)

Expand All @@ -422,7 +422,7 @@ def test_on_task_instance_failed_correctly_calls_openlineage_adapter_run_id_meth
mock_adapter.build_task_instance_run_id.assert_called_once_with(
dag_id="dag_id",
task_id="task_id",
execution_date="execution_date",
execution_date="2020-01-01T01:01:01",
try_number=1,
)

Expand All @@ -441,7 +441,7 @@ def test_on_task_instance_success_correctly_calls_openlineage_adapter_run_id_met
mock_adapter.build_task_instance_run_id.assert_called_once_with(
dag_id="dag_id",
task_id="task_id",
execution_date="execution_date",
execution_date="2020-01-01T01:01:01",
try_number=EXPECTED_TRY_NUMBER_1,
)

Expand Down

0 comments on commit d38e89c

Please sign in to comment.