Skip to content

Commit

Permalink
openlineage: fix / add some task attributes in AirflowRunFacet (#40725)
Browse files Browse the repository at this point in the history
* openlineage: Only include Airflow Datasets as inlets/outlets in AirflowRunFacet

Signed-off-by: Kacper Muda <[email protected]>

* openlineage: Add full path to operator in AirflowRunFacet

Signed-off-by: Kacper Muda <[email protected]>

* openlineage: Add documentation facet when translating from Table entity

Signed-off-by: Kacper Muda <[email protected]>

---------

Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored Jul 12, 2024
1 parent 24b5007 commit ea18121
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
3 changes: 3 additions & 0 deletions airflow/providers/openlineage/extractors/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def convert_to_ol_dataset_from_object_storage_uri(uri: str) -> Dataset | None:
def convert_to_ol_dataset_from_table(table: Table) -> Dataset:
from openlineage.client.facet import (
BaseFacet,
DocumentationDatasetFacet,
OwnershipDatasetFacet,
OwnershipDatasetFacetOwners,
SchemaDatasetFacet,
Expand Down Expand Up @@ -231,6 +232,8 @@ def convert_to_ol_dataset_from_table(table: Table) -> Dataset:
for user in table.owners
]
)
if table.description:
facets["documentation"] = DocumentationDatasetFacet(description=table.description)
return Dataset(
namespace=f"{table.cluster}",
name=f"{table.database}.{table.name}",
Expand Down
6 changes: 4 additions & 2 deletions airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from packaging.version import Version

from airflow import __version__ as AIRFLOW_VERSION
from airflow.datasets import Dataset
from airflow.exceptions import AirflowProviderDeprecationWarning # TODO: move this maybe to Airflow's logic?
from airflow.models import DAG, BaseOperator, MappedOperator
from airflow.providers.openlineage import conf
Expand Down Expand Up @@ -260,13 +261,14 @@ class TaskInfo(InfoJsonEncodable):
]
casts = {
"operator_class": lambda task: task.task_type,
"operator_class_path": lambda task: get_fully_qualified_class_name(task),
"task_group": lambda task: (
TaskGroupInfo(task.task_group)
if hasattr(task, "task_group") and getattr(task.task_group, "_group_id", None)
else None
),
"inlets": lambda task: [DatasetInfo(inlet) for inlet in task.inlets],
"outlets": lambda task: [DatasetInfo(outlet) for outlet in task.outlets],
"inlets": lambda task: [DatasetInfo(i) for i in task.inlets if isinstance(i, Dataset)],
"outlets": lambda task: [DatasetInfo(o) for o in task.outlets if isinstance(o, Dataset)],
}


Expand Down
3 changes: 3 additions & 0 deletions tests/providers/openlineage/extractors/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import pytest
from openlineage.client.facet import (
DocumentationDatasetFacet,
OwnershipDatasetFacet,
OwnershipDatasetFacetOwners,
SchemaDatasetFacet,
Expand Down Expand Up @@ -94,6 +95,7 @@ def test_convert_to_ol_dataset_from_table_with_columns_and_owners():
User(email="[email protected]", last_name="Smith"),
User(email="[email protected]"),
],
description="test description",
)
expected_facets = {
"schema": SchemaDatasetFacet(
Expand All @@ -118,6 +120,7 @@ def test_convert_to_ol_dataset_from_table_with_columns_and_owners():
OwnershipDatasetFacetOwners(name="user:<[email protected]>", type=""),
]
),
"documentation": DocumentationDatasetFacet(description="test description"),
}
result = ExtractorManager.convert_to_ol_dataset_from_table(table)
assert result.namespace == "c1"
Expand Down

0 comments on commit ea18121

Please sign in to comment.