Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OpenLineage] Fix datasets in GCSDeleteObjectsOperator #39059

Merged
merged 1 commit into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions airflow/providers/google/cloud/operators/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def __init__(
*,
bucket_name: str,
objects: list[str] | None = None,
prefix: str | None = None,
prefix: str | list[str] | None = None,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
Expand All @@ -309,12 +309,14 @@ def __init__(
self.impersonation_chain = impersonation_chain

if objects is None and prefix is None:
err_message = "(Task {task_id}) Either object or prefix should be set. Both are None.".format(
err_message = "(Task {task_id}) Either objects or prefix should be set. Both are None.".format(
**kwargs
)
raise ValueError(err_message)
if objects is not None and prefix is not None:
err_message = "(Task {task_id}) Objects or prefix should be set. Both provided.".format(**kwargs)
raise ValueError(err_message)

self._objects: list[str] = []
super().__init__(**kwargs)

def execute(self, context: Context) -> None:
Expand All @@ -324,15 +326,14 @@ def execute(self, context: Context) -> None:
)

if self.objects is not None:
self._objects = self.objects
objects = self.objects
else:
self._objects = hook.list(bucket_name=self.bucket_name, prefix=self.prefix)
self.log.info("Deleting %s objects from %s", len(self._objects), self.bucket_name)
for object_name in self._objects:
objects = hook.list(bucket_name=self.bucket_name, prefix=self.prefix)
self.log.info("Deleting %s objects from %s", len(objects), self.bucket_name)
for object_name in objects:
hook.delete(bucket_name=self.bucket_name, object_name=object_name)

def get_openlineage_facets_on_complete(self, task_instance):
"""Implement on_complete as execute() resolves object names."""
def get_openlineage_facets_on_start(self):
from openlineage.client.facet import (
LifecycleStateChange,
LifecycleStateChangeDatasetFacet,
Expand All @@ -342,8 +343,17 @@ def get_openlineage_facets_on_complete(self, task_instance):

from airflow.providers.openlineage.extractors import OperatorLineage

if not self._objects:
return OperatorLineage()
objects = []
if self.objects is not None:
objects = self.objects
elif self.prefix is not None:
prefixes = [self.prefix] if isinstance(self.prefix, str) else self.prefix
for pref in prefixes:
# Use parent if not a file (dot not in name) and not a dir (ends with slash)
if "." not in pref.split("/")[-1] and not pref.endswith("/"):
pref = Path(pref).parent.as_posix()
pref = "/" if pref in (".", "", "/") else pref.rstrip("/")
objects.append(pref)

bucket_url = f"gs://{self.bucket_name}"
input_datasets = [
Expand All @@ -360,7 +370,7 @@ def get_openlineage_facets_on_complete(self, task_instance):
)
},
)
for object_name in self._objects
for object_name in objects
]

return OperatorLineage(inputs=input_datasets)
Expand Down
61 changes: 36 additions & 25 deletions tests/providers/google/cloud/operators/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,48 +172,59 @@ def test_delete_prefix_as_empty_string(self, mock_hook):
any_order=True,
)

@mock.patch("airflow.providers.google.cloud.operators.gcs.GCSHook")
def test_get_openlineage_facets_on_complete(self, mock_hook):
@pytest.mark.parametrize(
("objects", "prefix", "inputs"),
(
(["folder/a.txt", "b.json"], None, ["folder/a.txt", "b.json"]),
mobuchowski marked this conversation as resolved.
Show resolved Hide resolved
(["folder/a.txt", "folder/b.json"], None, ["folder/a.txt", "folder/b.json"]),
(None, ["folder/a.txt", "b.json"], ["folder/a.txt", "b.json"]),
(None, "dir/pre", ["dir"]),
(None, ["dir/"], ["dir"]),
(None, "", ["/"]),
(None, "/", ["/"]),
(None, "pre", ["/"]),
(None, "dir/pre*", ["dir"]),
(None, "*", ["/"]),
),
ids=(
"objects",
"multiple objects in the same dir",
"objects as prefixes",
"directory with prefix",
"directory",
"empty prefix",
"slash as prefix",
"prefix with no ending slash",
"directory with prefix with wildcard",
"just wildcard",
),
)
def test_get_openlineage_facets_on_start(self, objects, prefix, inputs):
bucket_url = f"gs://{TEST_BUCKET}"
expected_inputs = [
Dataset(
namespace=bucket_url,
name="folder/a.txt",
name=name,
facets={
"lifecycleStateChange": LifecycleStateChangeDatasetFacet(
lifecycleStateChange=LifecycleStateChange.DROP.value,
previousIdentifier=LifecycleStateChangeDatasetFacetPreviousIdentifier(
namespace=bucket_url,
name="folder/a.txt",
name=name,
),
)
},
),
Dataset(
namespace=bucket_url,
name="b.txt",
facets={
"lifecycleStateChange": LifecycleStateChangeDatasetFacet(
lifecycleStateChange=LifecycleStateChange.DROP.value,
previousIdentifier=LifecycleStateChangeDatasetFacetPreviousIdentifier(
namespace=bucket_url,
name="b.txt",
),
)
},
),
)
for name in inputs
]

operator = GCSDeleteObjectsOperator(
task_id=TASK_ID, bucket_name=TEST_BUCKET, objects=["folder/a.txt", "b.txt"]
task_id=TASK_ID, bucket_name=TEST_BUCKET, objects=objects, prefix=prefix
)

operator.execute(None)

lineage = operator.get_openlineage_facets_on_complete(None)
assert len(lineage.inputs) == 2
lineage = operator.get_openlineage_facets_on_start()
assert len(lineage.inputs) == len(inputs)
assert len(lineage.outputs) == 0
assert lineage.inputs == expected_inputs
assert sorted(lineage.inputs) == sorted(expected_inputs)


class TestGoogleCloudStorageListOperator:
Expand Down