Skip to content

Commit

Permalink
fix: Use prefixes instead of full file paths for OpenLineage datasets…
Browse files Browse the repository at this point in the history
… in GCSToGCSOperator (#39058)

Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored Apr 18, 2024
1 parent a9d3364 commit 927e364
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 84 deletions.
8 changes: 3 additions & 5 deletions airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,6 @@ def get_openlineage_facets_on_complete(self, task_instance):
)
from openlineage.client.run import Dataset

from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
from airflow.providers.google.cloud.utils.openlineage import (
get_facets_from_bq_table,
get_identity_column_lineage_facet,
Expand All @@ -766,8 +765,7 @@ def get_openlineage_facets_on_complete(self, task_instance):
"schema": output_dataset_facets["schema"],
}
input_datasets = []
for uri in sorted(self.source_uris):
bucket, blob = _parse_gcs_url(uri)
for blob in sorted(self.source_objects):
additional_facets = {}

if "*" in blob:
Expand All @@ -777,7 +775,7 @@ def get_openlineage_facets_on_complete(self, task_instance):
"symlink": SymlinksDatasetFacet(
identifiers=[
SymlinksDatasetFacetIdentifiers(
namespace=f"gs://{bucket}", name=blob, type="file"
namespace=f"gs://{self.bucket}", name=blob, type="file"
)
]
),
Expand All @@ -788,7 +786,7 @@ def get_openlineage_facets_on_complete(self, task_instance):
blob = "/"

dataset = Dataset(
namespace=f"gs://{bucket}",
namespace=f"gs://{self.bucket}",
name=blob,
facets=merge_dicts(input_dataset_facets, additional_facets),
)
Expand Down
34 changes: 22 additions & 12 deletions airflow/providers/google/cloud/transfers/gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,6 @@ def __init__(
self.source_object_required = source_object_required
self.exact_match = exact_match
self.match_glob = match_glob
self.resolved_source_objects: set[str] = set()
self.resolved_target_objects: set[str] = set()

def execute(self, context: Context):
hook = GCSHook(
Expand Down Expand Up @@ -540,13 +538,6 @@ def _copy_single_object(self, hook, source_object, destination_object):
self.destination_bucket,
destination_object,
)

self.resolved_source_objects.add(source_object)
if not destination_object:
self.resolved_target_objects.add(source_object)
else:
self.resolved_target_objects.add(destination_object)

hook.rewrite(self.source_bucket, source_object, self.destination_bucket, destination_object)

if self.move_object:
Expand All @@ -559,17 +550,36 @@ def get_openlineage_facets_on_complete(self, task_instance):
This means we won't have to normalize self.source_object and self.source_objects,
destination bucket and so on.
"""
from pathlib import Path

from openlineage.client.run import Dataset

from airflow.providers.openlineage.extractors import OperatorLineage

def _process_prefix(pref):
if WILDCARD in pref:
pref = pref.split(WILDCARD)[0]
# Use parent if not a file (dot not in name) and not a dir (ends with slash)
if "." not in pref.split("/")[-1] and not pref.endswith("/"):
pref = Path(pref).parent.as_posix()
return ["/" if pref in ("", "/", ".") else pref.rstrip("/")] # Adjust root path

inputs = []
for prefix in self.source_objects:
result = _process_prefix(prefix)
inputs.extend(result)

if self.destination_object is None:
outputs = inputs.copy()
else:
outputs = _process_prefix(self.destination_object)

return OperatorLineage(
inputs=[
Dataset(namespace=f"gs://{self.source_bucket}", name=source)
for source in sorted(self.resolved_source_objects)
Dataset(namespace=f"gs://{self.source_bucket}", name=source) for source in sorted(set(inputs))
],
outputs=[
Dataset(namespace=f"gs://{self.destination_bucket}", name=target)
for target in sorted(self.resolved_target_objects)
for target in sorted(set(outputs))
],
)
207 changes: 140 additions & 67 deletions tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import pytest
from openlineage.client.run import Dataset

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.google.cloud.transfers.gcs_to_gcs import WILDCARD, GCSToGCSOperator

TASK_ID = "test-gcs-to-gcs-operator"
Expand Down Expand Up @@ -829,74 +829,147 @@ def test_copy_files_into_a_folder(
]
mock_hook.return_value.rewrite.assert_has_calls(mock_calls)

@pytest.mark.parametrize(
("source_objects", "destination_object", "inputs", "outputs"),
(
(
SOURCE_OBJECTS_SINGLE_FILE,
None,
[Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0])],
[Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0])],
),
(
SOURCE_OBJECTS_SINGLE_FILE,
"target.txt",
[Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0])],
[Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="target.txt")],
),
(
SOURCE_OBJECTS_SINGLE_FILE,
"target_pre",
[Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0])],
[Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/")],
),
(
SOURCE_OBJECTS_SINGLE_FILE,
"dir/",
[Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0])],
[Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="dir")],
),
(
SOURCE_OBJECTS_LIST,
"",
[
Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[0]),
Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[1]),
Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[2]),
],
[
Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/"),
],
),
(
[*SOURCE_OBJECTS_LIST, "dir/*"],
"parent/pre_",
[
Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[0]),
Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[1]),
Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[2]),
Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"),
],
[Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="parent")],
),
(
SOURCE_OBJECTS_NO_FILE,
"no_ending_slash",
[Dataset(namespace=f"gs://{TEST_BUCKET}", name="/")],
[Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/")],
),
(
[
f"dir/{SOURCE_OBJECT_WILDCARD_PREFIX}",
f"dir/{SOURCE_OBJECT_WILDCARD_SUFFIX}",
f"dir/{SOURCE_OBJECT_WILDCARD_MIDDLE}",
f"dir/{SOURCE_OBJECT_WILDCARD_FILENAME}",
"dir/*",
"dir/",
"dir/pre_",
],
"/",
[
Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"),
],
[
Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/"),
],
),
(
["", "dir/pre", SOURCE_OBJECTS_SINGLE_FILE[0]],
DESTINATION_OBJECT,
[
Dataset(namespace=f"gs://{TEST_BUCKET}", name="/"),
Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"),
Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0]),
],
[
Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name=DESTINATION_OBJECT_PREFIX),
],
),
(
[
"",
"dir/",
],
None,
[
Dataset(namespace=f"gs://{TEST_BUCKET}", name="/"),
Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"),
],
[
Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/"),
Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="dir"),
],
),
),
ids=(
"single file without output",
"single file with single file output",
"single file with prefix output",
"single file with dir output",
"multiple file with empty output",
"multiple file with prefix as output",
"empty prefix with prefix as output",
"directory + prefix or wildcard without output",
"mixed prefixes and file paths with output dir",
"empty prefix + directory without output",
),
)
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_simple_reports_openlineage(self, mock_hook):
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object=SOURCE_OBJECTS_SINGLE_FILE[0],
destination_bucket=DESTINATION_BUCKET,
)

operator.execute(None)

lineage = operator.get_openlineage_facets_on_complete(None)
assert len(lineage.inputs) == 1
assert len(lineage.outputs) == 1
assert lineage.inputs[0] == Dataset(
namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0]
)
assert lineage.outputs[0] == Dataset(
namespace=f"gs://{DESTINATION_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0]
)

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_multiple_reports_openlineage(self, mock_hook):
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_objects=SOURCE_OBJECTS_LIST,
destination_bucket=DESTINATION_BUCKET,
destination_object=DESTINATION_OBJECT,
)

operator.execute(None)

lineage = operator.get_openlineage_facets_on_complete(None)
assert len(lineage.inputs) == 3
assert len(lineage.outputs) == 1
assert lineage.inputs == [
Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[0]),
Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[1]),
Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[2]),
]
assert lineage.outputs[0] == Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name=DESTINATION_OBJECT)

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_wildcard_reports_openlineage(self, mock_hook):
mock_hook.return_value.list.return_value = [
"test_object1.txt",
"test_object2.txt",
]

operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object=SOURCE_OBJECT_WILDCARD_SUFFIX,
destination_bucket=DESTINATION_BUCKET,
destination_object=DESTINATION_OBJECT,
)
def test_get_openlineage_facets_on_complete(
self, mock_hook, source_objects, destination_object, inputs, outputs
):
if source_objects and any(WILDCARD in obj for obj in source_objects):
with pytest.warns(AirflowProviderDeprecationWarning, match="Usage of wildcard"):
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_objects=source_objects,
destination_bucket=DESTINATION_BUCKET,
destination_object=destination_object,
)
else:
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_objects=source_objects,
destination_bucket=DESTINATION_BUCKET,
destination_object=destination_object,
)

operator.execute(None)

lineage = operator.get_openlineage_facets_on_complete(None)
assert len(lineage.inputs) == 2
assert len(lineage.outputs) == 2
assert lineage.inputs == [
Dataset(namespace=f"gs://{TEST_BUCKET}", name="test_object1.txt"),
Dataset(namespace=f"gs://{TEST_BUCKET}", name="test_object2.txt"),
]
assert lineage.outputs == [
Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="foo/bar/1.txt"),
Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="foo/bar/2.txt"),
]
assert len(lineage.inputs) == len(inputs)
assert len(lineage.outputs) == len(outputs)
assert sorted(lineage.inputs) == sorted(inputs)
assert sorted(lineage.outputs) == sorted(outputs)

0 comments on commit 927e364

Please sign in to comment.