Skip to content

Commit

Permalink
Feature/add warnings to file watchers (#28206)
Browse files Browse the repository at this point in the history
* Update 2.50 release notes to include new Kafka topicPattern feature

* Add warning for using match continuously

* add match continuously warning to python as well

* fix lint errors

* fix formatting again
  • Loading branch information
johnjcasey authored Sep 1, 2023
1 parent 3591283 commit 3c93f7d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,18 @@ public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment)
* the watching frequency given by the {@code interval}. The pipeline will throw a {@code
* RuntimeError} if timestamp extraction for the matched file has failed, suggesting the
* timestamp metadata is not available with the IO connector.
*
* <p>Matching continuously scales poorly, as it is stateful, and requires storing file ids in
* memory. In addition, because it is memory-only, if a pipeline is restarted, already processed
* files will be reprocessed. Consider an alternate technique, such as <a
* href="https://cloud.google.com/storage/docs/pubsub-notifications">Pub/Sub Notifications</a>
* when using GCS if possible.
*/
public MatchConfiguration continuously(
Duration interval, TerminationCondition<String, ?> condition, boolean matchUpdatedFiles) {
LOG.warn(
"Matching Continuously is stateful, and can scale poorly. Consider using Pub/Sub "
+ "Notifications (https://cloud.google.com/storage/docs/pubsub-notifications) if possible");
return toBuilder()
.setWatchInterval(interval)
.setWatchTerminationCondition(condition)
Expand Down
12 changes: 12 additions & 0 deletions sdks/python/apache_beam/io/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,13 @@ class MatchContinuously(beam.PTransform):
MatchContinuously is experimental. No backwards-compatibility
guarantees.
Matching continuously scales poorly, as it is stateful, and requires storing
file ids in memory. In addition, because it is memory-only, if a pipeline is
restarted, already processed files will be reprocessed. Consider an alternate
technique, such as Pub/Sub Notifications
(https://cloud.google.com/storage/docs/pubsub-notifications)
when using GCS if possible.
"""
def __init__(
self,
Expand Down Expand Up @@ -299,6 +306,11 @@ def __init__(
self.match_upd = match_updated_files
self.apply_windowing = apply_windowing
self.empty_match_treatment = empty_match_treatment
_LOGGER.warning(
'Matching Continuously is stateful, and can scale poorly. '
'Consider using Pub/Sub Notifications '
'(https://cloud.google.com/storage/docs/pubsub-notifications) '
'if possible')

def expand(self, pbegin) -> beam.PCollection[filesystem.FileMetadata]:
# invoke periodic impulse
Expand Down

0 comments on commit 3c93f7d

Please sign in to comment.