Skip to content

Commit

Permalink
Merge pull request #14919 from y1chi/BEAM-12437
Browse files Browse the repository at this point in the history
[BEAM-12437] Fix broken test from missing allow_unsafe_triggers
  • Loading branch information
y1chi authored Jun 1, 2021
2 parents 94e637b + b3f505e commit 5a029fd
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 1 deletion.
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,9 @@ def test_bqfl_streaming(self):
data=[(i, ) for i in range(100)])

args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=all_of(state_matcher, bq_matcher), streaming=True)
on_success_matcher=all_of(state_matcher, bq_matcher),
streaming=True,
allow_unsafe_triggers=True)
with beam.Pipeline(argv=args) as p:
stream_source = (
TestStream().advance_watermark_to(0).advance_processing_time(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ def run_combine(pipeline, input_elements=5, lift_combiners=True):
# Enable runtime type checking in order to cover TypeCheckCombineFn by
# the test.
pipeline.get_pipeline_options().view_as(TypeOptions).runtime_type_check = True
pipeline.get_pipeline_options().view_as(
TypeOptions).allow_unsafe_triggers = True

with pipeline as p:
pcoll = p | 'Start' >> beam.Create(range(input_elements))
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/transforms/trigger_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from apache_beam import coders
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.portability import common_urns
from apache_beam.runners import pipeline_context
from apache_beam.runners.direct.clock import TestClock
Expand Down Expand Up @@ -1187,6 +1188,7 @@ def CheckAggregation(inputs_and_expected, aggregation):
with TestPipeline() as p:
# TODO(BEAM-8601): Pass this during pipeline construction.
p._options.view_as(StandardOptions).streaming = True
p._options.view_as(TypeOptions).allow_unsafe_triggers = True

# We can have at most one test stream per pipeline, so we share it.
inputs_and_expected = p | read_test_stream
Expand Down

0 comments on commit 5a029fd

Please sign in to comment.