Skip to content

Commit

Permalink
Merge pull request #33381 Migrate lineage counters to bounded tries.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Dec 19, 2024
2 parents 8fee3ca + 14f7caf commit e8df26f
Show file tree
Hide file tree
Showing 15 changed files with 104 additions and 116 deletions.
10 changes: 5 additions & 5 deletions sdks/python/apache_beam/io/aws/s3filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""S3 file system implementation for accessing files on AWS S3."""

# pytype: skip-file
import traceback

from apache_beam.io.aws import s3io
from apache_beam.io.filesystem import BeamIOError
Expand Down Expand Up @@ -315,14 +316,13 @@ def delete(self, paths):
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage, level=None):
def report_lineage(self, path, lineage):
try:
components = s3io.parse_s3_path(path, object_optional=True)
except ValueError:
# report lineage is fail-safe
traceback.print_exc()
return
if level == FileSystem.LineageLevel.TOP_LEVEL or \
(len(components) > 1 and components[-1] == ''):
# bucket only
if components and not components[-1]:
components = components[:-1]
lineage.add('s3', *components)
lineage.add('s3', *components, last_segment_sep='/')
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/aws/s3filesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ def test_lineage(self):
def _verify_lineage(self, uri, expected_segments):
lineage_mock = mock.MagicMock()
self.fs.report_lineage(uri, lineage_mock)
lineage_mock.add.assert_called_once_with("s3", *expected_segments)
lineage_mock.add.assert_called_once_with(
"s3", *expected_segments, last_segment_sep='/')


if __name__ == '__main__':
Expand Down
10 changes: 5 additions & 5 deletions sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""Azure Blob Storage Implementation for accesing files on
Azure Blob Storage.
"""
import traceback

from apache_beam.io.azure import blobstorageio
from apache_beam.io.filesystem import BeamIOError
Expand Down Expand Up @@ -317,15 +318,14 @@ def delete(self, paths):
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage, level=None):
def report_lineage(self, path, lineage):
try:
components = blobstorageio.parse_azfs_path(
path, blob_optional=True, get_account=True)
except ValueError:
# report lineage is fail-safe
traceback.print_exc()
return
if level == FileSystem.LineageLevel.TOP_LEVEL \
or(len(components) > 1 and components[-1] == ''):
# bucket only
if components and not components[-1]:
components = components[:-1]
lineage.add('abs', *components)
lineage.add('abs', *components, last_segment_sep='/')
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ def test_lineage(self):
def _verify_lineage(self, uri, expected_segments):
lineage_mock = mock.MagicMock()
self.fs.report_lineage(uri, lineage_mock)
lineage_mock.add.assert_called_once_with("abs", *expected_segments)
lineage_mock.add.assert_called_once_with(
"abs", *expected_segments, last_segment_sep='/')


if __name__ == '__main__':
Expand Down
20 changes: 6 additions & 14 deletions sdks/python/apache_beam/io/filebasedsink.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,24 +286,16 @@ def _check_state_for_finalize_write(self, writer_results, num_shards):

def _report_sink_lineage(self, dst_glob, dst_files):
"""
Report sink Lineage. Report every file if number of files no more than 100,
otherwise only report at directory level.
Report sink Lineage. Report every file if number of files no more than 10,
otherwise only report glob.
"""
if len(dst_files) <= 100:
# There is rollup at the higher level, but this loses glob information.
# Better to report multiple globs than just the parent directory.
if len(dst_files) <= 10:
for dst in dst_files:
FileSystems.report_sink_lineage(dst)
else:
dst = dst_glob
# dst_glob has a wildcard for shard number (see _shard_name_template)
sep = dst_glob.find('*')
if sep > 0:
dst = dst[:sep]
try:
dst, _ = FileSystems.split(dst)
except ValueError:
return # lineage report is fail-safe

FileSystems.report_sink_lineage(dst)
FileSystems.report_sink_lineage(dst_glob)

@check_accessible(['file_path_prefix'])
def finalize_write(
Expand Down
53 changes: 2 additions & 51 deletions sdks/python/apache_beam/io/filebasedsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from apache_beam.io import range_trackers
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.restriction_trackers import OffsetRange
from apache_beam.options.value_provider import StaticValueProvider
Expand Down Expand Up @@ -170,37 +169,11 @@ def _get_concat_source(self) -> concat_source.ConcatSource:
splittable=splittable)
single_file_sources.append(single_file_source)

self._report_source_lineage(files_metadata)
FileSystems.report_source_lineage(pattern)
self._concat_source = concat_source.ConcatSource(single_file_sources)

return self._concat_source

def _report_source_lineage(self, files_metadata):
"""
Report source Lineage. depend on the number of files, report full file
name, only dir, or only top level
"""
if len(files_metadata) <= 100:
for file_metadata in files_metadata:
FileSystems.report_source_lineage(file_metadata.path)
else:
size_track = set()
for file_metadata in files_metadata:
if len(size_track) >= 100:
FileSystems.report_source_lineage(
file_metadata.path, level=FileSystem.LineageLevel.TOP_LEVEL)
return

try:
base, _ = FileSystems.split(file_metadata.path)
except ValueError:
pass
else:
size_track.add(base)

for base in size_track:
FileSystems.report_source_lineage(base)

def open_file(self, file_name):
return FileSystems.open(
file_name,
Expand Down Expand Up @@ -382,7 +355,7 @@ def process(self, element: Union[str, FileMetadata], *args,
match_results = FileSystems.match([element])
metadata_list = match_results[0].metadata_list
for metadata in metadata_list:
self._report_source_lineage(metadata.path)
FileSystems.report_source_lineage(metadata.path)

splittable = (
self._splittable and _determine_splittability_from_compression_type(
Expand All @@ -397,28 +370,6 @@ def process(self, element: Union[str, FileMetadata], *args,
metadata,
OffsetRange(0, range_trackers.OffsetRangeTracker.OFFSET_INFINITY))

def _report_source_lineage(self, path):
"""
Report source Lineage. Due to the size limit of Beam metrics, report full
file name or only top level depend on the number of files.
* Number of files<=100, report full file paths;
* Otherwise, report top level only.
"""
if self._size_track is None:
self._size_track = set()
elif len(self._size_track) == 0:
FileSystems.report_source_lineage(
path, level=FileSystem.LineageLevel.TOP_LEVEL)
return

self._size_track.add(path)
FileSystems.report_source_lineage(path)

if len(self._size_track) >= 100:
self._size_track.clear()


class _ReadRange(DoFn):
def __init__(
Expand Down
6 changes: 1 addition & 5 deletions sdks/python/apache_beam/io/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,11 +934,7 @@ def delete(self, paths):
"""
raise NotImplementedError

class LineageLevel:
FILE = 'FILE'
TOP_LEVEL = 'TOP_LEVEL'

def report_lineage(self, path, unused_lineage, level=None):
def report_lineage(self, path, unused_lineage):
"""
Report Lineage metrics for path.
Expand Down
16 changes: 5 additions & 11 deletions sdks/python/apache_beam/io/filesystems.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,27 +391,21 @@ def get_chunk_size(path):
return filesystem.CHUNK_SIZE

@staticmethod
def report_source_lineage(path, level=None):
def report_source_lineage(path):
"""
Report source :class:`~apache_beam.metrics.metric.LineageLevel`.
Report source :class:`~apache_beam.metrics.metric.Lineage`.
Args:
path: string path to be reported.
level: the level of file path. default to
:class:`~apache_beam.io.filesystem.FileSystem.LineageLevel`.FILE.
"""
filesystem = FileSystems.get_filesystem(path)
filesystem.report_lineage(path, Lineage.sources(), level=level)
FileSystems.get_filesystem(path).report_lineage(path, Lineage.sources())

@staticmethod
def report_sink_lineage(path, level=None):
def report_sink_lineage(path):
"""
Report sink :class:`~apache_beam.metrics.metric.Lineage`.
Args:
path: string path to be reported.
level: the level of file path. default to
:class:`~apache_beam.io.filesystem.FileSystem.Lineage`.FILE.
"""
filesystem = FileSystems.get_filesystem(path)
filesystem.report_lineage(path, Lineage.sinks(), level=level)
FileSystems.get_filesystem(path).report_lineage(path, Lineage.sinks())
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,7 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None):
self.table_reference.datasetId,
self.table_reference.tableId)
Lineage.sources().add(
"bigquery",
'bigquery',
self.table_reference.projectId,
self.table_reference.datasetId,
self.table_reference.tableId)
Expand Down
10 changes: 5 additions & 5 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

# pytype: skip-file

import traceback
from typing import BinaryIO # pylint: disable=unused-import

from apache_beam.io.filesystem import BeamIOError
Expand Down Expand Up @@ -366,14 +367,13 @@ def delete(self, paths):
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage, level=None):
def report_lineage(self, path, lineage):
try:
components = gcsio.parse_gcs_path(path, object_optional=True)
except ValueError:
# report lineage is fail-safe
traceback.print_exc()
return
if level == FileSystem.LineageLevel.TOP_LEVEL \
or(len(components) > 1 and components[-1] == ''):
# bucket only
if components and not components[-1]:
components = components[:-1]
lineage.add('gcs', *components)
lineage.add('gcs', *components, last_segment_sep='/')
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ def test_lineage(self):
def _verify_lineage(self, uri, expected_segments):
lineage_mock = mock.MagicMock()
self.fs.report_lineage(uri, lineage_mock)
lineage_mock.add.assert_called_once_with("gcs", *expected_segments)
lineage_mock.add.assert_called_once_with(
"gcs", *expected_segments, last_segment_sep='/')


if __name__ == '__main__':
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/io/localfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,6 @@ def try_delete(path):

if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage):
lineage.add('filesystem', 'localhost', path, last_segment_sep='/')
7 changes: 6 additions & 1 deletion sdks/python/apache_beam/metrics/cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,9 @@ def contains(self, value):
else:
return False

def flattened(self):
return self.as_trie().flattened()

def to_proto(self) -> metrics_pb2.BoundedTrie:
return metrics_pb2.BoundedTrie(
bound=self._bound,
Expand All @@ -822,7 +825,9 @@ def from_proto(proto: metrics_pb2.BoundedTrie) -> 'BoundedTrieData':
return BoundedTrieData(
bound=proto.bound,
singleton=tuple(proto.singleton) if proto.singleton else None,
root=_BoundedTrieNode.from_proto(proto.root) if proto.root else None)
root=(
_BoundedTrieNode.from_proto(proto.root)
if proto.HasField('root') else None))

def as_trie(self):
if self._root is not None:
Expand Down
Loading

0 comments on commit e8df26f

Please sign in to comment.