diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 61237439727e..1436c459499e 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -104,14 +104,19 @@ from typing import Union import apache_beam as beam +from apache_beam.coders.coders import StrUtf8Coder from apache_beam.io import filesystem from apache_beam.io import filesystems from apache_beam.io.filesystem import BeamIOError from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.options.value_provider import ValueProvider +from apache_beam.transforms.periodicsequence import PeriodicImpulse +from apache_beam.transforms.userstate import BagStateSpec from apache_beam.transforms.window import GlobalWindow from apache_beam.utils.annotations import experimental +from apache_beam.utils.timestamp import MAX_TIMESTAMP +from apache_beam.utils.timestamp import Timestamp if TYPE_CHECKING: from apache_beam.transforms.window import BoundedWindow @@ -120,6 +125,7 @@ 'EmptyMatchTreatment', 'MatchFiles', 'MatchAll', + 'MatchContinuously', 'ReadableFile', 'ReadMatches' ] @@ -243,6 +249,57 @@ def process( yield ReadableFile(metadata, self._compression) +@experimental() +class MatchContinuously(beam.PTransform): + """Checks for new files for a given pattern every interval. + + This ``PTransform`` returns a ``PCollection`` of matching files in the form + of ``FileMetadata`` objects. + """ + def __init__( + self, + file_pattern, + interval=360.0, + has_deduplication=True, + start_timestamp=Timestamp.now(), + stop_timestamp=MAX_TIMESTAMP): + """Initializes a MatchContinuously transform. + + Args: + file_pattern: The file path to read from. + interval: Interval at which to check for files in seconds. + has_deduplication: Whether files already read are discarded or not. + start_timestamp: Timestamp for start file checking. + stop_timestamp: Timestamp after which no more files will be checked. + """ + + self.file_pattern = file_pattern + self.interval = interval + self.has_deduplication = has_deduplication + self.start_ts = start_timestamp + self.stop_ts = stop_timestamp + + def expand(self, pcol): + impulse = pcol | PeriodicImpulse( + start_timestamp=self.start_ts, + stop_timestamp=self.stop_ts, + fire_interval=self.interval) + + match_files = ( + impulse + | 'GetFilePattern' >> beam.Map(lambda x: self.file_pattern) + | MatchAll()) + + if self.has_deduplication: + match_files = ( + match_files + # Making a Key Value so each file has its own state. + | 'ToKV' >> beam.Map(lambda x: (x.path, x)) + | 'RemoveAlreadyRead' >> beam.ParDo(_RemoveDuplicates())) + + return match_files + + class ReadMatches(beam.PTransform): """Converts each result of MatchFiles() or MatchAll() to a ReadableFile. @@ -743,3 +800,20 @@ def finish_bundle(self): timestamp=key[1].start, windows=[key[1]] # TODO(pabloem) HOW DO WE GET THE PANE )) + + +class _RemoveDuplicates(beam.DoFn): + + FILES_STATE = BagStateSpec('files', StrUtf8Coder()) + + def process(self, element, file_state=beam.DoFn.StateParam(FILES_STATE)): + path = element[0] + file_metadata = element[1] + bag_content = [x for x in file_state.read()] + + if not bag_content: + file_state.add(path) + _LOGGER.debug('Generated entry for file %s', path) + yield file_metadata + else: + _LOGGER.debug('File %s was already read', path) diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 18d419888790..a39584a13b13 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -48,6 +48,7 @@ from apache_beam.transforms.window import FixedWindows from apache_beam.transforms.window import GlobalWindow from apache_beam.transforms.window import IntervalWindow +from apache_beam.utils.timestamp import Timestamp warnings.filterwarnings( 'ignore', category=FutureWarning, module='apache_beam.io.fileio_test') @@ -320,6 +321,71 @@ def test_transform_on_gcs(self): label='Assert Checksums') +class MatchContinuouslyTest(_TestCaseWithTempDirCleanUp): + def test_with_deduplication(self): + files = [] + tempdir = '%s%s' % (self._new_tempdir(), os.sep) + + # Create a file to be matched before pipeline + files.append(self._create_temp_file(dir=tempdir)) + # Add file name that will be created mid-pipeline + files.append(FileSystems.join(tempdir, 'extra')) + + interval = 0.2 + start = Timestamp.now() + stop = start + interval + 0.1 + + def _create_extra_file(element): + writer = FileSystems.create(FileSystems.join(tempdir, 'extra')) + writer.close() + return element.path + + with TestPipeline() as p: + match_continiously = ( + p + | fileio.MatchContinuously( + file_pattern=FileSystems.join(tempdir, '*'), + interval=interval, + start_timestamp=start, + stop_timestamp=stop) + | beam.Map(_create_extra_file)) + + assert_that(match_continiously, equal_to(files)) + + def test_without_deduplication(self): + interval = 0.2 + start = Timestamp.now() + stop = start + interval + 0.1 + + files = [] + tempdir = '%s%s' % (self._new_tempdir(), os.sep) + + # Create a file to be matched before pipeline starts + file = self._create_temp_file(dir=tempdir) + # Add file twice, since it will be matched for every interval + files += [file, file] + # Add file name that will be created mid-pipeline + files.append(FileSystems.join(tempdir, 'extra')) + + def _create_extra_file(element): + writer = FileSystems.create(FileSystems.join(tempdir, 'extra')) + writer.close() + return element.path + + with TestPipeline() as p: + match_continiously = ( + p + | fileio.MatchContinuously( + file_pattern=FileSystems.join(tempdir, '*'), + interval=interval, + has_deduplication=False, + start_timestamp=start, + stop_timestamp=stop) + | beam.Map(_create_extra_file)) + + assert_that(match_continiously, equal_to(files)) + + class WriteFilesTest(_TestCaseWithTempDirCleanUp): SIMPLE_COLLECTION = [