From 927e3643c2f901a3ac85f8dc94541ba83b3c6755 Mon Sep 17 00:00:00 2001 From: Kacper Muda Date: Thu, 18 Apr 2024 16:11:14 +0200 Subject: [PATCH] fix: Use prefixes instead of full file paths for OpenLineage datasets in GCSToGCSOperator (#39058) Signed-off-by: Kacper Muda --- .../google/cloud/transfers/gcs_to_bigquery.py | 8 +- .../google/cloud/transfers/gcs_to_gcs.py | 34 ++- .../google/cloud/transfers/test_gcs_to_gcs.py | 207 ++++++++++++------ 3 files changed, 165 insertions(+), 84 deletions(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 03aefcb8ad072..3899048dc419f 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -749,7 +749,6 @@ def get_openlineage_facets_on_complete(self, task_instance): ) from openlineage.client.run import Dataset - from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url from airflow.providers.google.cloud.utils.openlineage import ( get_facets_from_bq_table, get_identity_column_lineage_facet, @@ -766,8 +765,7 @@ def get_openlineage_facets_on_complete(self, task_instance): "schema": output_dataset_facets["schema"], } input_datasets = [] - for uri in sorted(self.source_uris): - bucket, blob = _parse_gcs_url(uri) + for blob in sorted(self.source_objects): additional_facets = {} if "*" in blob: @@ -777,7 +775,7 @@ def get_openlineage_facets_on_complete(self, task_instance): "symlink": SymlinksDatasetFacet( identifiers=[ SymlinksDatasetFacetIdentifiers( - namespace=f"gs://{bucket}", name=blob, type="file" + namespace=f"gs://{self.bucket}", name=blob, type="file" ) ] ), @@ -788,7 +786,7 @@ def get_openlineage_facets_on_complete(self, task_instance): blob = "/" dataset = Dataset( - namespace=f"gs://{bucket}", + namespace=f"gs://{self.bucket}", name=blob, facets=merge_dicts(input_dataset_facets, additional_facets), ) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py index 8d07dca58b7b4..0b3d330b65f94 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py @@ -234,8 +234,6 @@ def __init__( self.source_object_required = source_object_required self.exact_match = exact_match self.match_glob = match_glob - self.resolved_source_objects: set[str] = set() - self.resolved_target_objects: set[str] = set() def execute(self, context: Context): hook = GCSHook( @@ -540,13 +538,6 @@ def _copy_single_object(self, hook, source_object, destination_object): self.destination_bucket, destination_object, ) - - self.resolved_source_objects.add(source_object) - if not destination_object: - self.resolved_target_objects.add(source_object) - else: - self.resolved_target_objects.add(destination_object) - hook.rewrite(self.source_bucket, source_object, self.destination_bucket, destination_object) if self.move_object: @@ -559,17 +550,36 @@ def get_openlineage_facets_on_complete(self, task_instance): This means we won't have to normalize self.source_object and self.source_objects, destination bucket and so on. """ + from pathlib import Path + from openlineage.client.run import Dataset from airflow.providers.openlineage.extractors import OperatorLineage + def _process_prefix(pref): + if WILDCARD in pref: + pref = pref.split(WILDCARD)[0] + # Use parent if not a file (dot not in name) and not a dir (ends with slash) + if "." not in pref.split("/")[-1] and not pref.endswith("/"): + pref = Path(pref).parent.as_posix() + return ["/" if pref in ("", "/", ".") else pref.rstrip("/")] # Adjust root path + + inputs = [] + for prefix in self.source_objects: + result = _process_prefix(prefix) + inputs.extend(result) + + if self.destination_object is None: + outputs = inputs.copy() + else: + outputs = _process_prefix(self.destination_object) + return OperatorLineage( inputs=[ - Dataset(namespace=f"gs://{self.source_bucket}", name=source) - for source in sorted(self.resolved_source_objects) + Dataset(namespace=f"gs://{self.source_bucket}", name=source) for source in sorted(set(inputs)) ], outputs=[ Dataset(namespace=f"gs://{self.destination_bucket}", name=target) - for target in sorted(self.resolved_target_objects) + for target in sorted(set(outputs)) ], ) diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index f961038f5d4d8..97e16e73665d2 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -23,7 +23,7 @@ import pytest from openlineage.client.run import Dataset -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.providers.google.cloud.transfers.gcs_to_gcs import WILDCARD, GCSToGCSOperator TASK_ID = "test-gcs-to-gcs-operator" @@ -829,74 +829,147 @@ def test_copy_files_into_a_folder( ] mock_hook.return_value.rewrite.assert_has_calls(mock_calls) + @pytest.mark.parametrize( + ("source_objects", "destination_object", "inputs", "outputs"), + ( + ( + SOURCE_OBJECTS_SINGLE_FILE, + None, + [Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0])], + [Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0])], + ), + ( + SOURCE_OBJECTS_SINGLE_FILE, + "target.txt", + [Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0])], + [Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="target.txt")], + ), + ( + SOURCE_OBJECTS_SINGLE_FILE, + "target_pre", + [Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0])], + [Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/")], + ), + ( + SOURCE_OBJECTS_SINGLE_FILE, + "dir/", + [Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0])], + [Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="dir")], + ), + ( + SOURCE_OBJECTS_LIST, + "", + [ + Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[0]), + Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[1]), + Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[2]), + ], + [ + Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/"), + ], + ), + ( + [*SOURCE_OBJECTS_LIST, "dir/*"], + "parent/pre_", + [ + Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[0]), + Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[1]), + Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[2]), + Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"), + ], + [Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="parent")], + ), + ( + SOURCE_OBJECTS_NO_FILE, + "no_ending_slash", + [Dataset(namespace=f"gs://{TEST_BUCKET}", name="/")], + [Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/")], + ), + ( + [ + f"dir/{SOURCE_OBJECT_WILDCARD_PREFIX}", + f"dir/{SOURCE_OBJECT_WILDCARD_SUFFIX}", + f"dir/{SOURCE_OBJECT_WILDCARD_MIDDLE}", + f"dir/{SOURCE_OBJECT_WILDCARD_FILENAME}", + "dir/*", + "dir/", + "dir/pre_", + ], + "/", + [ + Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"), + ], + [ + Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/"), + ], + ), + ( + ["", "dir/pre", SOURCE_OBJECTS_SINGLE_FILE[0]], + DESTINATION_OBJECT, + [ + Dataset(namespace=f"gs://{TEST_BUCKET}", name="/"), + Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"), + Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0]), + ], + [ + Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name=DESTINATION_OBJECT_PREFIX), + ], + ), + ( + [ + "", + "dir/", + ], + None, + [ + Dataset(namespace=f"gs://{TEST_BUCKET}", name="/"), + Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"), + ], + [ + Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/"), + Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="dir"), + ], + ), + ), + ids=( + "single file without output", + "single file with single file output", + "single file with prefix output", + "single file with dir output", + "multiple file with empty output", + "multiple file with prefix as output", + "empty prefix with prefix as output", + "directory + prefix or wildcard without output", + "mixed prefixes and file paths with output dir", + "empty prefix + directory without output", + ), + ) @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") - def test_execute_simple_reports_openlineage(self, mock_hook): - operator = GCSToGCSOperator( - task_id=TASK_ID, - source_bucket=TEST_BUCKET, - source_object=SOURCE_OBJECTS_SINGLE_FILE[0], - destination_bucket=DESTINATION_BUCKET, - ) - - operator.execute(None) - - lineage = operator.get_openlineage_facets_on_complete(None) - assert len(lineage.inputs) == 1 - assert len(lineage.outputs) == 1 - assert lineage.inputs[0] == Dataset( - namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0] - ) - assert lineage.outputs[0] == Dataset( - namespace=f"gs://{DESTINATION_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0] - ) - - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") - def test_execute_multiple_reports_openlineage(self, mock_hook): - operator = GCSToGCSOperator( - task_id=TASK_ID, - source_bucket=TEST_BUCKET, - source_objects=SOURCE_OBJECTS_LIST, - destination_bucket=DESTINATION_BUCKET, - destination_object=DESTINATION_OBJECT, - ) - - operator.execute(None) - - lineage = operator.get_openlineage_facets_on_complete(None) - assert len(lineage.inputs) == 3 - assert len(lineage.outputs) == 1 - assert lineage.inputs == [ - Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[0]), - Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[1]), - Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[2]), - ] - assert lineage.outputs[0] == Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name=DESTINATION_OBJECT) - - @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") - def test_execute_wildcard_reports_openlineage(self, mock_hook): - mock_hook.return_value.list.return_value = [ - "test_object1.txt", - "test_object2.txt", - ] - - operator = GCSToGCSOperator( - task_id=TASK_ID, - source_bucket=TEST_BUCKET, - source_object=SOURCE_OBJECT_WILDCARD_SUFFIX, - destination_bucket=DESTINATION_BUCKET, - destination_object=DESTINATION_OBJECT, - ) + def test_get_openlineage_facets_on_complete( + self, mock_hook, source_objects, destination_object, inputs, outputs + ): + if source_objects and any(WILDCARD in obj for obj in source_objects): + with pytest.warns(AirflowProviderDeprecationWarning, match="Usage of wildcard"): + operator = GCSToGCSOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_objects=source_objects, + destination_bucket=DESTINATION_BUCKET, + destination_object=destination_object, + ) + else: + operator = GCSToGCSOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_objects=source_objects, + destination_bucket=DESTINATION_BUCKET, + destination_object=destination_object, + ) operator.execute(None) lineage = operator.get_openlineage_facets_on_complete(None) - assert len(lineage.inputs) == 2 - assert len(lineage.outputs) == 2 - assert lineage.inputs == [ - Dataset(namespace=f"gs://{TEST_BUCKET}", name="test_object1.txt"), - Dataset(namespace=f"gs://{TEST_BUCKET}", name="test_object2.txt"), - ] - assert lineage.outputs == [ - Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="foo/bar/1.txt"), - Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="foo/bar/2.txt"), - ] + assert len(lineage.inputs) == len(inputs) + assert len(lineage.outputs) == len(outputs) + assert sorted(lineage.inputs) == sorted(inputs) + assert sorted(lineage.outputs) == sorted(outputs)