From b2d4a69e4b7b4612425e53ef7326c22a4f819b30 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 13 Dec 2024 15:42:12 -0800 Subject: [PATCH] Migrate lineage counters to bounded tries. --- .../python/apache_beam/io/aws/s3filesystem.py | 2 +- .../apache_beam/io/aws/s3filesystem_test.py | 3 +- .../io/azure/blobstoragefilesystem.py | 2 +- .../io/azure/blobstoragefilesystem_test.py | 3 +- sdks/python/apache_beam/io/gcp/bigquery.py | 2 +- .../apache_beam/io/gcp/gcsfilesystem.py | 2 +- .../apache_beam/io/gcp/gcsfilesystem_test.py | 3 +- sdks/python/apache_beam/metrics/metric.py | 51 ++++++++++++++++--- .../python/apache_beam/metrics/metric_test.py | 11 ++-- 9 files changed, 61 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/io/aws/s3filesystem.py b/sdks/python/apache_beam/io/aws/s3filesystem.py index ffbce5893a96..494de14c83a8 100644 --- a/sdks/python/apache_beam/io/aws/s3filesystem.py +++ b/sdks/python/apache_beam/io/aws/s3filesystem.py @@ -325,4 +325,4 @@ def report_lineage(self, path, lineage, level=None): (len(components) > 1 and components[-1] == ''): # bucket only components = components[:-1] - lineage.add('s3', *components) + lineage.add('s3', *components, last_segment_sep='/') diff --git a/sdks/python/apache_beam/io/aws/s3filesystem_test.py b/sdks/python/apache_beam/io/aws/s3filesystem_test.py index 87403f482bd2..036727cd7a70 100644 --- a/sdks/python/apache_beam/io/aws/s3filesystem_test.py +++ b/sdks/python/apache_beam/io/aws/s3filesystem_test.py @@ -272,7 +272,8 @@ def test_lineage(self): def _verify_lineage(self, uri, expected_segments): lineage_mock = mock.MagicMock() self.fs.report_lineage(uri, lineage_mock) - lineage_mock.add.assert_called_once_with("s3", *expected_segments) + lineage_mock.add.assert_called_once_with( + "s3", *expected_segments, last_segment_sep='/') if __name__ == '__main__': diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py index 4495245dc54a..ff908451b1b7 100644 --- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py +++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py @@ -328,4 +328,4 @@ def report_lineage(self, path, lineage, level=None): or(len(components) > 1 and components[-1] == ''): # bucket only components = components[:-1] - lineage.add('abs', *components) + lineage.add('abs', *components, last_segment_sep='/') diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py index 138fe5f78b20..c3418e137e87 100644 --- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py +++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py @@ -330,7 +330,8 @@ def test_lineage(self): def _verify_lineage(self, uri, expected_segments): lineage_mock = mock.MagicMock() self.fs.report_lineage(uri, lineage_mock) - lineage_mock.add.assert_called_once_with("abs", *expected_segments) + lineage_mock.add.assert_called_once_with( + "abs", *expected_segments, last_segment_sep='/') if __name__ == '__main__': diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 11e0d098b2f3..9f60b5af6726 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1163,7 +1163,7 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): self.table_reference.datasetId, self.table_reference.tableId) Lineage.sources().add( - "bigquery", + 'bigquery', self.table_reference.projectId, self.table_reference.datasetId, self.table_reference.tableId) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 325f70ddfd96..7e293ccd9d9f 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -376,4 +376,4 @@ def report_lineage(self, path, lineage, level=None): or(len(components) > 1 and components[-1] == ''): # bucket only components = components[:-1] - lineage.add('gcs', *components) + lineage.add('gcs', *components, last_segment_sep='/') diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index ec7fa94b05fd..ade8529dcac8 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -382,7 +382,8 @@ def test_lineage(self): def _verify_lineage(self, uri, expected_segments): lineage_mock = mock.MagicMock() self.fs.report_lineage(uri, lineage_mock) - lineage_mock.add.assert_called_once_with("gcs", *expected_segments) + lineage_mock.add.assert_called_once_with( + "gcs", *expected_segments, last_segment_sep='/') if __name__ == '__main__': diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 33af25e20ca4..837a3686949c 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -33,6 +33,7 @@ from typing import Dict from typing import FrozenSet from typing import Iterable +from typing import Iterator from typing import List from typing import Optional from typing import Set @@ -342,8 +343,8 @@ class Lineage: SINK = "sinks" _METRICS = { - SOURCE: Metrics.string_set(LINEAGE_NAMESPACE, SOURCE), - SINK: Metrics.string_set(LINEAGE_NAMESPACE, SINK) + SOURCE: Metrics.bounded_trie(LINEAGE_NAMESPACE, SOURCE), + SINK: Metrics.bounded_trie(LINEAGE_NAMESPACE, SINK) } def __init__(self, label: str) -> None: @@ -392,8 +393,32 @@ def get_fq_name( return ':'.join((system, subtype, segs)) return ':'.join((system, segs)) + @staticmethod + def _get_fqn_parts( + system: str, + *segments: str, + subtype: Optional[str] = None, + last_segment_sep=None) -> Iterator[str]: + yield system + ':' + if subtype: + yield subtype + ':' + if segments: + for segment in segments[:-1]: + yield segment + '.' + if last_segment_sep: + sub_segments = segments[-1].split(last_segment_sep) + for sub_segment in sub_segments[:-1]: + yield sub_segment + last_segment_sep + yield sub_segments[-1] + else: + yield segments[-1] + def add( - self, system: str, *segments: str, subtype: Optional[str] = None) -> None: + self, + system: str, + *segments: str, + subtype: Optional[str] = None, + last_segment_sep=None) -> None: """ Adds the given details as Lineage. @@ -414,11 +439,21 @@ def add( The first positional argument serves as system, if full segments are provided, or the full FQN if it is provided as a single argument. """ - system_or_details = system - if len(segments) == 0 and subtype is None: - self.metric.add(system_or_details) - else: - self.metric.add(self.get_fq_name(system, *segments, subtype=subtype)) + self.add_raw( + *self._get_fqn_parts( + system, + *segments, + subtype=subtype, + last_segment_sep=last_segment_sep)) + + def add_raw(self, *rollup_segments: str): + """Adds the given fqn as lineage. + + `rollup_segments` should be an iterable of strings whose concatenation + is a valid Dataplex FQN. In particular, this means they will often have + trailing delimiters. + """ + self.metric.add(rollup_segments) @staticmethod def query(results: MetricResults, label: str) -> Set[str]: diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index 524a2143172d..2e2e51b267a7 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -271,14 +271,19 @@ def test_fq_name(self): def test_add(self): lineage = Lineage(Lineage.SOURCE) - stringset = set() + added = set() # override - lineage.metric = stringset + lineage.metric = added lineage.add("s", "1", "2") lineage.add("s:3.4") lineage.add("s", "5", "6.7") lineage.add("s", "1", "2", subtype="t") - self.assertSetEqual(stringset, {"s:1.2", "s:3.4", "s:t:1.2", "s:5.`6.7`"}) + lineage.add("sys", "seg1", "seg2", "seg3/part2/part3", last_segment_sep='/') + self.assertSetEqual( + added, + {('s:', '1.', '2'), ('s:3.4:', ), ('s:', '5.', '6.7'), + ('s:', 't:', '1.', '2'), + ('sys:', 'seg1.', 'seg2.', 'seg3/', 'part2/', 'part3')}) if __name__ == '__main__':