Skip to content

Commit

Permalink
Remove file-specific lineage bounding.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Dec 17, 2024
1 parent 992b054 commit b76f5ca
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 97 deletions.
7 changes: 3 additions & 4 deletions sdks/python/apache_beam/io/aws/s3filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,13 @@ def delete(self, paths):
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage, level=None):
def report_lineage(self, path, lineage):
try:
components = s3io.parse_s3_path(path, object_optional=True)
except ValueError:
# report lineage is fail-safe
traceback.print_exc()
return
if level == FileSystem.LineageLevel.TOP_LEVEL or \
(len(components) > 1 and components[-1] == ''):
# bucket only
if components and not components[-1]:
components = components[:-1]
lineage.add('s3', *components, last_segment_sep='/')
7 changes: 3 additions & 4 deletions sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,14 @@ def delete(self, paths):
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage, level=None):
def report_lineage(self, path, lineage):
try:
components = blobstorageio.parse_azfs_path(
path, blob_optional=True, get_account=True)
except ValueError:
# report lineage is fail-safe
traceback.print_exc()
return
if level == FileSystem.LineageLevel.TOP_LEVEL \
or(len(components) > 1 and components[-1] == ''):
# bucket only
if components and not components[-1]:
components = components[:-1]
lineage.add('abs', *components, last_segment_sep='/')
20 changes: 6 additions & 14 deletions sdks/python/apache_beam/io/filebasedsink.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,24 +286,16 @@ def _check_state_for_finalize_write(self, writer_results, num_shards):

def _report_sink_lineage(self, dst_glob, dst_files):
"""
Report sink Lineage. Report every file if number of files no more than 100,
otherwise only report at directory level.
Report sink Lineage. Report every file if number of files no more than 10,
otherwise only report glob.
"""
if len(dst_files) <= 100:
# There is rollup at the higher level, but this loses glob information.
# Better to report multiple globs than just the parent directory.
if len(dst_files) <= 10:
for dst in dst_files:
FileSystems.report_sink_lineage(dst)
else:
dst = dst_glob
# dst_glob has a wildcard for shard number (see _shard_name_template)
sep = dst_glob.find('*')
if sep > 0:
dst = dst[:sep]
try:
dst, _ = FileSystems.split(dst)
except ValueError:
return # lineage report is fail-safe

FileSystems.report_sink_lineage(dst)
FileSystems.report_sink_lineage(dst_glob)

@check_accessible(['file_path_prefix'])
def finalize_write(
Expand Down
52 changes: 2 additions & 50 deletions sdks/python/apache_beam/io/filebasedsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,37 +170,11 @@ def _get_concat_source(self) -> concat_source.ConcatSource:
splittable=splittable)
single_file_sources.append(single_file_source)

self._report_source_lineage(files_metadata)
FileSystems.report_source_lineage(pattern)
self._concat_source = concat_source.ConcatSource(single_file_sources)

return self._concat_source

def _report_source_lineage(self, files_metadata):
"""
Report source Lineage. depend on the number of files, report full file
name, only dir, or only top level
"""
if len(files_metadata) <= 100:
for file_metadata in files_metadata:
FileSystems.report_source_lineage(file_metadata.path)
else:
size_track = set()
for file_metadata in files_metadata:
if len(size_track) >= 100:
FileSystems.report_source_lineage(
file_metadata.path, level=FileSystem.LineageLevel.TOP_LEVEL)
return

try:
base, _ = FileSystems.split(file_metadata.path)
except ValueError:
pass
else:
size_track.add(base)

for base in size_track:
FileSystems.report_source_lineage(base)

def open_file(self, file_name):
return FileSystems.open(
file_name,
Expand Down Expand Up @@ -382,7 +356,7 @@ def process(self, element: Union[str, FileMetadata], *args,
match_results = FileSystems.match([element])
metadata_list = match_results[0].metadata_list
for metadata in metadata_list:
self._report_source_lineage(metadata.path)
FileSystems.report_source_lineage(metadata.path)

splittable = (
self._splittable and _determine_splittability_from_compression_type(
Expand All @@ -397,28 +371,6 @@ def process(self, element: Union[str, FileMetadata], *args,
metadata,
OffsetRange(0, range_trackers.OffsetRangeTracker.OFFSET_INFINITY))

def _report_source_lineage(self, path):
"""
Report source Lineage. Due to the size limit of Beam metrics, report full
file name or only top level depend on the number of files.
* Number of files<=100, report full file paths;
* Otherwise, report top level only.
"""
if self._size_track is None:
self._size_track = set()
elif len(self._size_track) == 0:
FileSystems.report_source_lineage(
path, level=FileSystem.LineageLevel.TOP_LEVEL)
return

self._size_track.add(path)
FileSystems.report_source_lineage(path)

if len(self._size_track) >= 100:
self._size_track.clear()


class _ReadRange(DoFn):
def __init__(
Expand Down
6 changes: 1 addition & 5 deletions sdks/python/apache_beam/io/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,11 +934,7 @@ def delete(self, paths):
"""
raise NotImplementedError

class LineageLevel:
FILE = 'FILE'
TOP_LEVEL = 'TOP_LEVEL'

def report_lineage(self, path, unused_lineage, level=None):
def report_lineage(self, path, unused_lineage):
"""
Report Lineage metrics for path.
Expand Down
16 changes: 5 additions & 11 deletions sdks/python/apache_beam/io/filesystems.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,27 +391,21 @@ def get_chunk_size(path):
return filesystem.CHUNK_SIZE

@staticmethod
def report_source_lineage(path, level=None):
def report_source_lineage(path):
"""
Report source :class:`~apache_beam.metrics.metric.LineageLevel`.
Report source :class:`~apache_beam.metrics.metric.Lineage`.
Args:
path: string path to be reported.
level: the level of file path. default to
:class:`~apache_beam.io.filesystem.FileSystem.LineageLevel`.FILE.
"""
filesystem = FileSystems.get_filesystem(path)
filesystem.report_lineage(path, Lineage.sources(), level=level)
FileSystems.get_filesystem(path).report_lineage(path, Lineage.sources())

@staticmethod
def report_sink_lineage(path, level=None):
def report_sink_lineage(path):
"""
Report sink :class:`~apache_beam.metrics.metric.Lineage`.
Args:
path: string path to be reported.
level: the level of file path. default to
:class:`~apache_beam.io.filesystem.FileSystem.Lineage`.FILE.
"""
filesystem = FileSystems.get_filesystem(path)
filesystem.report_lineage(path, Lineage.sinks(), level=level)
FileSystems.get_filesystem(path).report_lineage(path, Lineage.sinks())
7 changes: 3 additions & 4 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,13 @@ def delete(self, paths):
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage, level=None):
def report_lineage(self, path, lineage):
try:
components = gcsio.parse_gcs_path(path, object_optional=True)
except ValueError:
# report lineage is fail-safe
traceback.print_exc()
return
if level == FileSystem.LineageLevel.TOP_LEVEL \
or(len(components) > 1 and components[-1] == ''):
# bucket only
if components and not components[-1]:
components = components[:-1]
lineage.add('gcs', *components, last_segment_sep='/')
7 changes: 2 additions & 5 deletions sdks/python/apache_beam/io/localfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,5 @@ def try_delete(path):
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

def report_lineage(self, path, lineage, level=None):
if level == FileSystem.LineageLevel.TOP_LEVEL:
lineage.add('filesystem', 'localhost')
else:
lineage.add('filesystem', 'localhost', path, last_segment_sep='/')
def report_lineage(self, path, lineage):
lineage.add('filesystem', 'localhost', path, last_segment_sep='/')

0 comments on commit b76f5ca

Please sign in to comment.