Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate lineage counters to bounded tries. #33381

Merged
merged 6 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions sdks/python/apache_beam/io/aws/s3filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,13 @@ def delete(self, paths):
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage, level=None):
def report_lineage(self, path, lineage):
try:
components = s3io.parse_s3_path(path, object_optional=True)
except ValueError:
# report lineage is fail-safe
traceback.print_exc()
return
if level == FileSystem.LineageLevel.TOP_LEVEL or \
(len(components) > 1 and components[-1] == ''):
# bucket only
if components and not components[-1]:
components = components[:-1]
lineage.add('s3', *components)
lineage.add('s3', *components, last_segment_sep='/')
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/aws/s3filesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ def test_lineage(self):
def _verify_lineage(self, uri, expected_segments):
lineage_mock = mock.MagicMock()
self.fs.report_lineage(uri, lineage_mock)
lineage_mock.add.assert_called_once_with("s3", *expected_segments)
lineage_mock.add.assert_called_once_with(
"s3", *expected_segments, last_segment_sep='/')


if __name__ == '__main__':
Expand Down
9 changes: 4 additions & 5 deletions sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,14 @@ def delete(self, paths):
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage, level=None):
def report_lineage(self, path, lineage):
try:
components = blobstorageio.parse_azfs_path(
path, blob_optional=True, get_account=True)
except ValueError:
# report lineage is fail-safe
traceback.print_exc()
return
if level == FileSystem.LineageLevel.TOP_LEVEL \
or(len(components) > 1 and components[-1] == ''):
# bucket only
if components and not components[-1]:
components = components[:-1]
lineage.add('abs', *components)
lineage.add('abs', *components, last_segment_sep='/')
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ def test_lineage(self):
def _verify_lineage(self, uri, expected_segments):
lineage_mock = mock.MagicMock()
self.fs.report_lineage(uri, lineage_mock)
lineage_mock.add.assert_called_once_with("abs", *expected_segments)
lineage_mock.add.assert_called_once_with(
"abs", *expected_segments, last_segment_sep='/')


if __name__ == '__main__':
Expand Down
20 changes: 6 additions & 14 deletions sdks/python/apache_beam/io/filebasedsink.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,24 +286,16 @@ def _check_state_for_finalize_write(self, writer_results, num_shards):

def _report_sink_lineage(self, dst_glob, dst_files):
"""
Report sink Lineage. Report every file if number of files no more than 100,
otherwise only report at directory level.
Report sink Lineage. Report every file if number of files no more than 10,
otherwise only report glob.
"""
if len(dst_files) <= 100:
# There is rollup at the higher level, but this loses glob information.
# Better to report multiple globs than just the parent directory.
if len(dst_files) <= 10:
for dst in dst_files:
FileSystems.report_sink_lineage(dst)
else:
dst = dst_glob
# dst_glob has a wildcard for shard number (see _shard_name_template)
sep = dst_glob.find('*')
if sep > 0:
dst = dst[:sep]
try:
dst, _ = FileSystems.split(dst)
except ValueError:
return # lineage report is fail-safe

FileSystems.report_sink_lineage(dst)
FileSystems.report_sink_lineage(dst_glob)

@check_accessible(['file_path_prefix'])
def finalize_write(
Expand Down
53 changes: 2 additions & 51 deletions sdks/python/apache_beam/io/filebasedsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from apache_beam.io import range_trackers
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.restriction_trackers import OffsetRange
from apache_beam.options.value_provider import StaticValueProvider
Expand Down Expand Up @@ -170,37 +169,11 @@ def _get_concat_source(self) -> concat_source.ConcatSource:
splittable=splittable)
single_file_sources.append(single_file_source)

self._report_source_lineage(files_metadata)
FileSystems.report_source_lineage(pattern)
self._concat_source = concat_source.ConcatSource(single_file_sources)

return self._concat_source

def _report_source_lineage(self, files_metadata):
"""
Report source Lineage. depend on the number of files, report full file
name, only dir, or only top level
"""
if len(files_metadata) <= 100:
for file_metadata in files_metadata:
FileSystems.report_source_lineage(file_metadata.path)
else:
size_track = set()
for file_metadata in files_metadata:
if len(size_track) >= 100:
FileSystems.report_source_lineage(
file_metadata.path, level=FileSystem.LineageLevel.TOP_LEVEL)
return

try:
base, _ = FileSystems.split(file_metadata.path)
except ValueError:
pass
else:
size_track.add(base)

for base in size_track:
FileSystems.report_source_lineage(base)

def open_file(self, file_name):
return FileSystems.open(
file_name,
Expand Down Expand Up @@ -382,7 +355,7 @@ def process(self, element: Union[str, FileMetadata], *args,
match_results = FileSystems.match([element])
metadata_list = match_results[0].metadata_list
for metadata in metadata_list:
self._report_source_lineage(metadata.path)
FileSystems.report_source_lineage(metadata.path)

splittable = (
self._splittable and _determine_splittability_from_compression_type(
Expand All @@ -397,28 +370,6 @@ def process(self, element: Union[str, FileMetadata], *args,
metadata,
OffsetRange(0, range_trackers.OffsetRangeTracker.OFFSET_INFINITY))

def _report_source_lineage(self, path):
"""
Report source Lineage. Due to the size limit of Beam metrics, report full
file name or only top level depend on the number of files.

* Number of files<=100, report full file paths;

* Otherwise, report top level only.
"""
if self._size_track is None:
self._size_track = set()
elif len(self._size_track) == 0:
FileSystems.report_source_lineage(
path, level=FileSystem.LineageLevel.TOP_LEVEL)
return

self._size_track.add(path)
FileSystems.report_source_lineage(path)

if len(self._size_track) >= 100:
self._size_track.clear()


class _ReadRange(DoFn):
def __init__(
Expand Down
6 changes: 1 addition & 5 deletions sdks/python/apache_beam/io/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,11 +934,7 @@ def delete(self, paths):
"""
raise NotImplementedError

class LineageLevel:
FILE = 'FILE'
TOP_LEVEL = 'TOP_LEVEL'

def report_lineage(self, path, unused_lineage, level=None):
def report_lineage(self, path, unused_lineage):
"""
Report Lineage metrics for path.
Expand Down
16 changes: 5 additions & 11 deletions sdks/python/apache_beam/io/filesystems.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,27 +391,21 @@ def get_chunk_size(path):
return filesystem.CHUNK_SIZE

@staticmethod
def report_source_lineage(path, level=None):
def report_source_lineage(path):
"""
Report source :class:`~apache_beam.metrics.metric.LineageLevel`.
Report source :class:`~apache_beam.metrics.metric.Lineage`.
Args:
path: string path to be reported.
level: the level of file path. default to
:class:`~apache_beam.io.filesystem.FileSystem.LineageLevel`.FILE.
"""
filesystem = FileSystems.get_filesystem(path)
filesystem.report_lineage(path, Lineage.sources(), level=level)
FileSystems.get_filesystem(path).report_lineage(path, Lineage.sources())

@staticmethod
def report_sink_lineage(path, level=None):
def report_sink_lineage(path):
"""
Report sink :class:`~apache_beam.metrics.metric.Lineage`.
Args:
path: string path to be reported.
level: the level of file path. default to
:class:`~apache_beam.io.filesystem.FileSystem.Lineage`.FILE.
"""
filesystem = FileSystems.get_filesystem(path)
filesystem.report_lineage(path, Lineage.sinks(), level=level)
FileSystems.get_filesystem(path).report_lineage(path, Lineage.sinks())
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,7 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None):
self.table_reference.datasetId,
self.table_reference.tableId)
Lineage.sources().add(
"bigquery",
'bigquery',
self.table_reference.projectId,
self.table_reference.datasetId,
self.table_reference.tableId)
Expand Down
9 changes: 4 additions & 5 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,13 @@ def delete(self, paths):
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage, level=None):
def report_lineage(self, path, lineage):
try:
components = gcsio.parse_gcs_path(path, object_optional=True)
except ValueError:
# report lineage is fail-safe
traceback.print_exc()
return
if level == FileSystem.LineageLevel.TOP_LEVEL \
or(len(components) > 1 and components[-1] == ''):
# bucket only
if components and not components[-1]:
components = components[:-1]
lineage.add('gcs', *components)
lineage.add('gcs', *components, last_segment_sep='/')
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ def test_lineage(self):
def _verify_lineage(self, uri, expected_segments):
lineage_mock = mock.MagicMock()
self.fs.report_lineage(uri, lineage_mock)
lineage_mock.add.assert_called_once_with("gcs", *expected_segments)
lineage_mock.add.assert_called_once_with(
"gcs", *expected_segments, last_segment_sep='/')


if __name__ == '__main__':
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/io/localfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,6 @@ def try_delete(path):

if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage):
lineage.add('filesystem', 'localhost', path, last_segment_sep='/')
51 changes: 43 additions & 8 deletions sdks/python/apache_beam/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from typing import Dict
from typing import FrozenSet
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Optional
from typing import Set
Expand Down Expand Up @@ -342,8 +343,8 @@ class Lineage:
SINK = "sinks"

_METRICS = {
SOURCE: Metrics.string_set(LINEAGE_NAMESPACE, SOURCE),
SINK: Metrics.string_set(LINEAGE_NAMESPACE, SINK)
SOURCE: Metrics.bounded_trie(LINEAGE_NAMESPACE, SOURCE),
SINK: Metrics.bounded_trie(LINEAGE_NAMESPACE, SINK)
}

def __init__(self, label: str) -> None:
Expand Down Expand Up @@ -392,8 +393,32 @@ def get_fq_name(
return ':'.join((system, subtype, segs))
return ':'.join((system, segs))

@staticmethod
def _get_fqn_parts(
system: str,
*segments: str,
subtype: Optional[str] = None,
last_segment_sep: Optional[str] = None) -> Iterator[str]:
yield system + ':'
if subtype:
yield subtype + ':'
if segments:
for segment in segments[:-1]:
yield segment + '.'
if last_segment_sep:
sub_segments = segments[-1].split(last_segment_sep)
for sub_segment in sub_segments[:-1]:
yield sub_segment + last_segment_sep
yield sub_segments[-1]
else:
yield segments[-1]

def add(
self, system: str, *segments: str, subtype: Optional[str] = None) -> None:
self,
system: str,
*segments: str,
subtype: Optional[str] = None,
last_segment_sep: Optional[str] = None) -> None:
"""
Adds the given details as Lineage.

Expand All @@ -414,11 +439,21 @@ def add(
The first positional argument serves as system, if full segments are
provided, or the full FQN if it is provided as a single argument.
"""
system_or_details = system
if len(segments) == 0 and subtype is None:
self.metric.add(system_or_details)
else:
self.metric.add(self.get_fq_name(system, *segments, subtype=subtype))
self.add_raw(
*self._get_fqn_parts(
system,
*segments,
subtype=subtype,
last_segment_sep=last_segment_sep))

def add_raw(self, *rollup_segments: str) -> None:
"""Adds the given fqn as lineage.

`rollup_segments` should be an iterable of strings whose concatenation
is a valid Dataplex FQN. In particular, this means they will often have
trailing delimiters.
"""
self.metric.add(rollup_segments)

@staticmethod
def query(results: MetricResults, label: str) -> Set[str]:
Expand Down
11 changes: 8 additions & 3 deletions sdks/python/apache_beam/metrics/metric_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,19 @@ def test_fq_name(self):

def test_add(self):
lineage = Lineage(Lineage.SOURCE)
stringset = set()
added = set()
# override
lineage.metric = stringset
lineage.metric = added
lineage.add("s", "1", "2")
lineage.add("s:3.4")
lineage.add("s", "5", "6.7")
lineage.add("s", "1", "2", subtype="t")
self.assertSetEqual(stringset, {"s:1.2", "s:3.4", "s:t:1.2", "s:5.`6.7`"})
lineage.add("sys", "seg1", "seg2", "seg3/part2/part3", last_segment_sep='/')
self.assertSetEqual(
added,
{('s:', '1.', '2'), ('s:3.4:', ), ('s:', '5.', '6.7'),
('s:', 't:', '1.', '2'),
('sys:', 'seg1.', 'seg2.', 'seg3/', 'part2/', 'part3')})


if __name__ == '__main__':
Expand Down
Loading