From b3f505e3226e19357f9f3eafeded655fac159662 Mon Sep 17 00:00:00 2001 From: Yichi Zhang Date: Tue, 1 Jun 2021 09:53:02 -0700 Subject: [PATCH] [BEAM-12437] Fix broken test from missing allow_unsafe_triggers --- sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py | 4 +++- .../apache_beam/transforms/combinefn_lifecycle_pipeline.py | 2 ++ sdks/python/apache_beam/transforms/trigger_test.py | 2 ++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 9eb59b51d507..96724314929d 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -840,7 +840,9 @@ def test_bqfl_streaming(self): data=[(i, ) for i in range(100)]) args = self.test_pipeline.get_full_options_as_args( - on_success_matcher=all_of(state_matcher, bq_matcher), streaming=True) + on_success_matcher=all_of(state_matcher, bq_matcher), + streaming=True, + allow_unsafe_triggers=True) with beam.Pipeline(argv=args) as p: stream_source = ( TestStream().advance_watermark_to(0).advance_processing_time( diff --git a/sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py b/sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py index 5186e090b073..1964082e8dab 100644 --- a/sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py +++ b/sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py @@ -102,6 +102,8 @@ def run_combine(pipeline, input_elements=5, lift_combiners=True): # Enable runtime type checking in order to cover TypeCheckCombineFn by # the test. pipeline.get_pipeline_options().view_as(TypeOptions).runtime_type_check = True + pipeline.get_pipeline_options().view_as( + TypeOptions).allow_unsafe_triggers = True with pipeline as p: pcoll = p | 'Start' >> beam.Create(range(input_elements)) diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index 9e1a5694ab6d..ed43094088c1 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -32,6 +32,7 @@ from apache_beam import coders from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.options.pipeline_options import TypeOptions from apache_beam.portability import common_urns from apache_beam.runners import pipeline_context from apache_beam.runners.direct.clock import TestClock @@ -1187,6 +1188,7 @@ def CheckAggregation(inputs_and_expected, aggregation): with TestPipeline() as p: # TODO(BEAM-8601): Pass this during pipeline construction. p._options.view_as(StandardOptions).streaming = True + p._options.view_as(TypeOptions).allow_unsafe_triggers = True # We can have at most one test stream per pipeline, so we share it. inputs_and_expected = p | read_test_stream