Skip to content

Commit

Permalink
Add MatchContinuously PTransform to Python SDK (#15106)
Browse files Browse the repository at this point in the history
* Add MatchContinuously PTransform

* Add MatchContinuously PTransform

* Add MatchContinuously PTransform

* Add MatchContinuously PTransform
  • Loading branch information
InigoSJ authored Jul 1, 2021
1 parent e4d27c1 commit 22ec4e6
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 0 deletions.
74 changes: 74 additions & 0 deletions sdks/python/apache_beam/io/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,19 @@
from typing import Union

import apache_beam as beam
from apache_beam.coders.coders import StrUtf8Coder
from apache_beam.io import filesystem
from apache_beam.io import filesystems
from apache_beam.io.filesystem import BeamIOError
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.options.value_provider import ValueProvider
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.userstate import BagStateSpec
from apache_beam.transforms.window import GlobalWindow
from apache_beam.utils.annotations import experimental
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import Timestamp

if TYPE_CHECKING:
from apache_beam.transforms.window import BoundedWindow
Expand All @@ -120,6 +125,7 @@
'EmptyMatchTreatment',
'MatchFiles',
'MatchAll',
'MatchContinuously',
'ReadableFile',
'ReadMatches'
]
Expand Down Expand Up @@ -243,6 +249,57 @@ def process(
yield ReadableFile(metadata, self._compression)


@experimental()
class MatchContinuously(beam.PTransform):
"""Checks for new files for a given pattern every interval.
This ``PTransform`` returns a ``PCollection`` of matching files in the form
of ``FileMetadata`` objects.
"""
def __init__(
self,
file_pattern,
interval=360.0,
has_deduplication=True,
start_timestamp=Timestamp.now(),
stop_timestamp=MAX_TIMESTAMP):
"""Initializes a MatchContinuously transform.
Args:
file_pattern: The file path to read from.
interval: Interval at which to check for files in seconds.
has_deduplication: Whether files already read are discarded or not.
start_timestamp: Timestamp for start file checking.
stop_timestamp: Timestamp after which no more files will be checked.
"""

self.file_pattern = file_pattern
self.interval = interval
self.has_deduplication = has_deduplication
self.start_ts = start_timestamp
self.stop_ts = stop_timestamp

def expand(self, pcol):
impulse = pcol | PeriodicImpulse(
start_timestamp=self.start_ts,
stop_timestamp=self.stop_ts,
fire_interval=self.interval)

match_files = (
impulse
| 'GetFilePattern' >> beam.Map(lambda x: self.file_pattern)
| MatchAll())

if self.has_deduplication:
match_files = (
match_files
# Making a Key Value so each file has its own state.
| 'ToKV' >> beam.Map(lambda x: (x.path, x))
| 'RemoveAlreadyRead' >> beam.ParDo(_RemoveDuplicates()))

return match_files


class ReadMatches(beam.PTransform):
"""Converts each result of MatchFiles() or MatchAll() to a ReadableFile.
Expand Down Expand Up @@ -743,3 +800,20 @@ def finish_bundle(self):
timestamp=key[1].start,
windows=[key[1]] # TODO(pabloem) HOW DO WE GET THE PANE
))


class _RemoveDuplicates(beam.DoFn):

FILES_STATE = BagStateSpec('files', StrUtf8Coder())

def process(self, element, file_state=beam.DoFn.StateParam(FILES_STATE)):
path = element[0]
file_metadata = element[1]
bag_content = [x for x in file_state.read()]

if not bag_content:
file_state.add(path)
_LOGGER.debug('Generated entry for file %s', path)
yield file_metadata
else:
_LOGGER.debug('File %s was already read', path)
66 changes: 66 additions & 0 deletions sdks/python/apache_beam/io/fileio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import IntervalWindow
from apache_beam.utils.timestamp import Timestamp

warnings.filterwarnings(
'ignore', category=FutureWarning, module='apache_beam.io.fileio_test')
Expand Down Expand Up @@ -320,6 +321,71 @@ def test_transform_on_gcs(self):
label='Assert Checksums')


class MatchContinuouslyTest(_TestCaseWithTempDirCleanUp):
def test_with_deduplication(self):
files = []
tempdir = '%s%s' % (self._new_tempdir(), os.sep)

# Create a file to be matched before pipeline
files.append(self._create_temp_file(dir=tempdir))
# Add file name that will be created mid-pipeline
files.append(FileSystems.join(tempdir, 'extra'))

interval = 0.2
start = Timestamp.now()
stop = start + interval + 0.1

def _create_extra_file(element):
writer = FileSystems.create(FileSystems.join(tempdir, 'extra'))
writer.close()
return element.path

with TestPipeline() as p:
match_continiously = (
p
| fileio.MatchContinuously(
file_pattern=FileSystems.join(tempdir, '*'),
interval=interval,
start_timestamp=start,
stop_timestamp=stop)
| beam.Map(_create_extra_file))

assert_that(match_continiously, equal_to(files))

def test_without_deduplication(self):
interval = 0.2
start = Timestamp.now()
stop = start + interval + 0.1

files = []
tempdir = '%s%s' % (self._new_tempdir(), os.sep)

# Create a file to be matched before pipeline starts
file = self._create_temp_file(dir=tempdir)
# Add file twice, since it will be matched for every interval
files += [file, file]
# Add file name that will be created mid-pipeline
files.append(FileSystems.join(tempdir, 'extra'))

def _create_extra_file(element):
writer = FileSystems.create(FileSystems.join(tempdir, 'extra'))
writer.close()
return element.path

with TestPipeline() as p:
match_continiously = (
p
| fileio.MatchContinuously(
file_pattern=FileSystems.join(tempdir, '*'),
interval=interval,
has_deduplication=False,
start_timestamp=start,
stop_timestamp=stop)
| beam.Map(_create_extra_file))

assert_that(match_continiously, equal_to(files))


class WriteFilesTest(_TestCaseWithTempDirCleanUp):

SIMPLE_COLLECTION = [
Expand Down

0 comments on commit 22ec4e6

Please sign in to comment.