From 8ef95f554d36c1f1fd60463ca791c6be4b10c080 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 29 Aug 2022 22:08:42 +0000 Subject: [PATCH 01/10] remove WaitForBQJobs and perform waits at each step's finish_bundle. copy jobs will provide trigger for delete stage --- .../apache_beam/io/gcp/bigquery_file_loads.py | 156 ++++++++---------- 1 file changed, 73 insertions(+), 83 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 41fdb6e7cc84..adbd926eddcc 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -77,6 +77,9 @@ # triggering file write to avoid generating too many small files. _FILE_TRIGGERING_BATCHING_DURATION_SECS = 1 +# How many seconds we wait before polling a pending job +_SLEEP_DURATION_BETWEEN_POLLS = 10 + def _generate_job_name(job_name, job_type, step_name): return bigquery_tools.generate_bq_job_name( @@ -353,9 +356,10 @@ def __init__( self._step_name = step_name self._load_job_project_id = load_job_project_id - def setup(self): - self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client) + def start_bundle(self): + self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client) self._bq_io_metadata = create_bigquery_io_metadata(self._step_name) + self.pending_jobs = [] def display_data(self): return { @@ -390,7 +394,7 @@ def process(self, element, schema_mod_job_name_prefix): try: # Check if destination table exists - destination_table = self._bq_wrapper.get_table( + destination_table = self.bq_wrapper.get_table( project_id=table_reference.projectId, dataset_id=table_reference.datasetId, table_id=table_reference.tableId) @@ -402,7 +406,7 @@ def process(self, element, schema_mod_job_name_prefix): else: raise - temp_table_load_job = self._bq_wrapper.get_job( + temp_table_load_job = self.bq_wrapper.get_job( project=temp_table_load_job_reference.projectId, job_id=temp_table_load_job_reference.jobId, location=temp_table_load_job_reference.location) @@ -430,20 +434,34 @@ def process(self, element, schema_mod_job_name_prefix): table_reference) # Trigger potential schema modification by loading zero rows into the # destination table with the temporary table schema. - schema_update_job_reference = self._bq_wrapper.perform_load_job( - destination=table_reference, - source_stream=io.BytesIO(), # file with zero rows - job_id=job_name, - schema=temp_table_schema, - write_disposition='WRITE_APPEND', - create_disposition='CREATE_NEVER', - additional_load_parameters=additional_parameters, - job_labels=self._bq_io_metadata.add_additional_bq_job_labels(), - # JSON format is hardcoded because zero rows load(unlike AVRO) and - # a nested schema(unlike CSV, which a default one) is permitted. - source_format="NEWLINE_DELIMITED_JSON", - load_job_project_id=self._load_job_project_id) - yield (destination, schema_update_job_reference) + schema_update_job_reference = self.bq_wrapper.perform_load_job( + destination=table_reference, + source_stream=io.BytesIO(), # file with zero rows + job_id=job_name, + schema=temp_table_schema, + write_disposition='WRITE_APPEND', + create_disposition='CREATE_NEVER', + additional_load_parameters=additional_parameters, + job_labels=self._bq_io_metadata.add_additional_bq_job_labels(), + # JSON format is hardcoded because zero rows load(unlike AVRO) and + # a nested schema(unlike CSV, which a default one) is permitted. + source_format="NEWLINE_DELIMITED_JSON", + load_job_project_id=self._load_job_project_id) + self.pending_jobs.append( + GlobalWindows.windowed_value( + (destination, schema_update_job_reference))) + + def finish_bundle(self): + # Unlike the other steps, schema update is not always necessary. + # In that case, return a None value to avoid blocking in streaming context + if not self.pending_jobs: + return [GlobalWindows.windowed_value(None)] + + for windowed_value in self.pending_jobs: + job_ref = windowed_value.value[1] + self.bq_wrapper.wait_for_bq_job( + job_ref, sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS) + return self.pending_jobs class TriggerCopyJobs(beam.DoFn): @@ -460,6 +478,9 @@ class TriggerCopyJobs(beam.DoFn): copying from temp_tables to destination_table is not atomic. See: https://issues.apache.org/jira/browse/BEAM-7822 """ + + TRIGGER_DELETE_TEMP_TABLES = 'TriggerDeleteTempTables' + def __init__( self, create_disposition=None, @@ -486,6 +507,7 @@ def start_bundle(self): self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client) if not self.bq_io_metadata: self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) + self.pending_jobs = [] def process(self, element, job_name_prefix=None, unused_schema_mod_jobs=None): destination = element[0] @@ -547,8 +569,19 @@ def process(self, element, job_name_prefix=None, unused_schema_mod_jobs=None): if wait_for_job: self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10) + self.pending_jobs.append( + GlobalWindows.windowed_value((destination, job_reference))) + + def finish_bundle(self): + for windowed_value in self.pending_jobs: + job_ref = windowed_value.value[1] + self.bq_wrapper.wait_for_bq_job( + job_ref, sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS) - yield (destination, job_reference) + yield pvalue.TaggedOutput( + TriggerCopyJobs.TRIGGER_DELETE_TEMP_TABLES, + GlobalWindows.windowed_value(None)) + return self.pending_jobs class TriggerLoadJobs(beam.DoFn): @@ -603,6 +636,7 @@ def start_bundle(self): self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client) if not self.bq_io_metadata: self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) + self.pending_jobs = [] def process(self, element, load_job_name_prefix, *schema_side_inputs): # Each load job is assumed to have files respecting these constraints: @@ -676,7 +710,15 @@ def process(self, element, load_job_name_prefix, *schema_side_inputs): source_format=self.source_format, job_labels=self.bq_io_metadata.add_additional_bq_job_labels(), load_job_project_id=self.load_job_project_id) - yield (destination, job_reference) + self.pending_jobs.append( + GlobalWindows.windowed_value((destination, job_reference))) + + def finish_bundle(self): + for windowed_value in self.pending_jobs: + job_ref = windowed_value.value[1] + self.bq_wrapper.wait_for_bq_job( + job_ref, sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS) + return self.pending_jobs class PartitionFiles(beam.DoFn): @@ -733,29 +775,6 @@ def process(self, element): yield pvalue.TaggedOutput(output_tag, (destination, partition)) -class WaitForBQJobs(beam.DoFn): - """Takes in a series of BQ job names as side input, and waits for all of them. - - If any job fails, it will fail. If all jobs succeed, it will succeed. - - Experimental; no backwards compatibility guarantees. - """ - def __init__(self, test_client=None): - self.test_client = test_client - - def start_bundle(self): - self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client) - - def process(self, element, dest_ids_list): - job_references = [elm[1] for elm in dest_ids_list] - for ref in job_references: - # We must poll repeatedly until the job finishes or fails, thus setting - # max_retries to 0. - self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10, max_retries=0) - - return dest_ids_list # Pass the list of destination-jobs downstream - - class DeleteTablesFn(beam.DoFn): def __init__(self, test_client=None): self.test_client = test_client @@ -1029,15 +1048,8 @@ def _load_data( temp_tables_load_job_ids_pc = trigger_loads_outputs['main'] temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES] - finished_temp_tables_load_jobs_pc = ( - p - | "ImpulseMonitorLoadJobs" >> beam.Create([None]) - | "WaitForTempTableLoadJobs" >> beam.ParDo( - WaitForBQJobs(self.test_client), - pvalue.AsList(temp_tables_load_job_ids_pc))) - schema_mod_job_ids_pc = ( - finished_temp_tables_load_jobs_pc + temp_tables_load_job_ids_pc | beam.ParDo( UpdateDestinationSchema( write_disposition=self.write_disposition, @@ -1047,15 +1059,8 @@ def _load_data( load_job_project_id=self.load_job_project_id), schema_mod_job_name_pcv)) - finished_schema_mod_jobs_pc = ( - p - | "ImpulseMonitorSchemaModJobs" >> beam.Create([None]) - | "WaitForSchemaModJobs" >> beam.ParDo( - WaitForBQJobs(self.test_client), - pvalue.AsList(schema_mod_job_ids_pc))) - - destination_copy_job_ids_pc = ( - finished_temp_tables_load_jobs_pc + copy_job_outputs = ( + temp_tables_load_job_ids_pc | beam.ParDo( TriggerCopyJobs( create_disposition=self.create_disposition, @@ -1064,25 +1069,17 @@ def _load_data( step_name=step_name, load_job_project_id=self.load_job_project_id), copy_job_name_pcv, - pvalue.AsIter(finished_schema_mod_jobs_pc))) + pvalue.AsIter(schema_mod_job_ids_pc)).with_outputs( + TriggerCopyJobs.TRIGGER_DELETE_TEMP_TABLES, main='main')) - finished_copy_jobs_pc = ( - p - | "ImpulseMonitorCopyJobs" >> beam.Create([None]) - | "WaitForCopyJobs" >> beam.ParDo( - WaitForBQJobs(self.test_client), - pvalue.AsList(destination_copy_job_ids_pc))) + destination_copy_job_ids_pc = copy_job_outputs['main'] + trigger_delete = copy_job_outputs[ + TriggerCopyJobs.TRIGGER_DELETE_TEMP_TABLES] _ = ( - p - | "RemoveTempTables/Impulse" >> beam.Create([None]) - | "RemoveTempTables/PassTables" >> beam.FlatMap( - lambda _, - unused_copy_jobs, - deleting_tables: deleting_tables, - pvalue.AsIter(finished_copy_jobs_pc), - pvalue.AsIter(temp_tables_pc)) - | "RemoveTempTables/AddUselessValue" >> beam.Map(lambda x: (x, None)) + temp_tables_pc + | "RemoveTempTables/AddUselessValue" >> beam.Map( + lambda x, unused_trigger: (x, None), pvalue.AsList(trigger_delete)) | "RemoveTempTables/DeduplicateTables" >> beam.GroupByKey() | "RemoveTempTables/GetTableNames" >> beam.Keys() | "RemoveTempTables/Delete" >> beam.ParDo( @@ -1105,13 +1102,6 @@ def _load_data( load_job_name_pcv, *self.schema_side_inputs)) - _ = ( - p - | "ImpulseMonitorDestinationLoadJobs" >> beam.Create([None]) - | "WaitForDestinationLoadJobs" >> beam.ParDo( - WaitForBQJobs(self.test_client), - pvalue.AsList(destination_load_job_ids_pc))) - destination_load_job_ids_pc = ( (temp_tables_load_job_ids_pc, destination_load_job_ids_pc) | beam.Flatten()) From 67d9f3578b9508fff10a7309095a452bb45c1add Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 29 Aug 2022 22:11:24 +0000 Subject: [PATCH 02/10] clarify why we emit a None value when no schema updates needed --- sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index adbd926eddcc..bbd750f8d24a 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -453,7 +453,8 @@ def process(self, element, schema_mod_job_name_prefix): def finish_bundle(self): # Unlike the other steps, schema update is not always necessary. - # In that case, return a None value to avoid blocking in streaming context + # In that case, return a None value to avoid blocking in streaming context. + # Otherwise, streaming pipeline would hang waiting for TriggerCopyJobs side-input. if not self.pending_jobs: return [GlobalWindows.windowed_value(None)] From ac161e86dc1fd0e71a6df9dc058fa9df7f27f4cf Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 30 Aug 2022 19:02:16 +0000 Subject: [PATCH 03/10] added test for copy jobs --- .../io/gcp/bigquery_file_loads_test.py | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 21798fae0fe7..81d46aefe89b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -946,6 +946,51 @@ def test_bqfl_streaming(self): .Method.FILE_LOADS, triggering_frequency=100)) + @pytest.mark.it_postcommit + def test_bqfl_streaming_with_copy_jobs(self): + if isinstance(self.test_pipeline.runner, TestDataflowRunner): + self.skipTest("TestStream is not supported on TestDataflowRunner") + output_table = '%s_%s' % (self.output_table, 'with_copy_jobs') + _LOGGER.info("output_table: %s", output_table) + _SIZE = 100 + schema = self.BIG_QUERY_STREAMING_SCHEMA + l = [{'Integr': i} for i in range(_SIZE)] + + state_matcher = PipelineStateMatcher(PipelineState.RUNNING) + bq_matcher = BigqueryFullResultStreamingMatcher( + project=self.project, + query="SELECT Integr FROM %s" % output_table, + data=[(i,) for i in range(100)]) + + args = self.test_pipeline.get_full_options_as_args( + on_success_matcher=all_of(state_matcher, bq_matcher), + streaming=True, + allow_unsafe_triggers=True) + + # Override these parameters to induce copy jobs + bqfl._DEFAULT_MAX_FILE_SIZE = 100 + bqfl._MAXIMUM_LOAD_SIZE = 400 + + with beam.Pipeline(argv=args) as p: + stream_source = ( + TestStream().advance_watermark_to(0).advance_processing_time( + 100).add_elements(l[:_SIZE // 4]). + advance_processing_time(100).advance_watermark_to(100).add_elements( + l[_SIZE // 4:2 * _SIZE // 4]).advance_processing_time( + 100).advance_watermark_to(200).add_elements( + l[2 * _SIZE // 4:3 * _SIZE // 4]).advance_processing_time( + 100).advance_watermark_to(300).add_elements( + l[3 * _SIZE // 4:]).advance_processing_time( + 100).advance_watermark_to_infinity().advance_processing_time(100)) + + _ = (p + | stream_source + | bigquery.WriteToBigQuery(output_table, + schema=schema, + method=bigquery.WriteToBigQuery \ + .Method.FILE_LOADS, + triggering_frequency=100)) + @pytest.mark.it_postcommit def test_one_job_fails_all_jobs_fail(self): From 939ab82fb8ea487a2a64c98c765288b9fa57ed7f Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 30 Aug 2022 21:26:39 +0000 Subject: [PATCH 04/10] add test for dynamic destination streaming --- .../io/gcp/bigquery_file_loads_test.py | 43 ++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 81d46aefe89b..3a77614cdc5b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -951,7 +951,6 @@ def test_bqfl_streaming_with_copy_jobs(self): if isinstance(self.test_pipeline.runner, TestDataflowRunner): self.skipTest("TestStream is not supported on TestDataflowRunner") output_table = '%s_%s' % (self.output_table, 'with_copy_jobs') - _LOGGER.info("output_table: %s", output_table) _SIZE = 100 schema = self.BIG_QUERY_STREAMING_SCHEMA l = [{'Integr': i} for i in range(_SIZE)] @@ -991,6 +990,48 @@ def test_bqfl_streaming_with_copy_jobs(self): .Method.FILE_LOADS, triggering_frequency=100)) + @pytest.mark.it_postcommit + def test_bqfl_streaming_with_dynamic_destinations(self): + if isinstance(self.test_pipeline.runner, TestDataflowRunner): + self.skipTest("TestStream is not supported on TestDataflowRunner") + self.output_table = "google.com:clouddfe:ahmedabualsaud_test.bqfl_stream_test" + prefix = self.output_table + output_table = lambda row: '%s_%s' % (prefix, f"dynamic_destination_{row['Integr'] % 2}") + _SIZE = 100 + schema = self.BIG_QUERY_STREAMING_SCHEMA + l = [{'Integr': i} for i in range(_SIZE)] + + state_matcher = PipelineStateMatcher(PipelineState.RUNNING) + bq_matcher = BigqueryFullResultStreamingMatcher( + project=self.project, + query="SELECT Integr FROM %s" % output_table, + data=[(i,) for i in range(90)]) + + args = self.test_pipeline.get_full_options_as_args( + on_success_matcher=all_of(state_matcher, bq_matcher), + streaming=True, + allow_unsafe_triggers=True) + + with beam.Pipeline(argv=args) as p: + stream_source = ( + TestStream().advance_watermark_to(0).advance_processing_time( + 100).add_elements(l[:_SIZE // 4]). + advance_processing_time(100).advance_watermark_to(100).add_elements( + l[_SIZE // 4:2 * _SIZE // 4]).advance_processing_time( + 100).advance_watermark_to(200).add_elements( + l[2 * _SIZE // 4:3 * _SIZE // 4]).advance_processing_time( + 100).advance_watermark_to(300).add_elements( + l[3 * _SIZE // 4:]).advance_processing_time( + 100).advance_watermark_to_infinity().advance_processing_time(100)) + + _ = (p + | stream_source + | bigquery.WriteToBigQuery(output_table, + schema=schema, + method=bigquery.WriteToBigQuery \ + .Method.FILE_LOADS, + triggering_frequency=100)) + @pytest.mark.it_postcommit def test_one_job_fails_all_jobs_fail(self): From 39f94ed3fd6927ef7d7d28b6eb183ca7eb0a18ff Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 1 Sep 2022 21:26:24 +0000 Subject: [PATCH 05/10] properly check results with matchers --- .../io/gcp/bigquery_file_loads_test.py | 35 +++++++++++-------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 3a77614cdc5b..34a80c64ee85 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -917,14 +917,13 @@ def test_bqfl_streaming(self): schema = self.BIG_QUERY_STREAMING_SCHEMA l = [{'Integr': i} for i in range(_SIZE)] - state_matcher = PipelineStateMatcher(PipelineState.RUNNING) bq_matcher = BigqueryFullResultStreamingMatcher( project=self.project, query="SELECT Integr FROM %s" % output_table, - data=[(i, ) for i in range(100)]) + data=[(i, ) for i in range(100)], + timeout=30) args = self.test_pipeline.get_full_options_as_args( - on_success_matcher=all_of(state_matcher, bq_matcher), streaming=True, allow_unsafe_triggers=True) with beam.Pipeline(argv=args) as p: @@ -946,6 +945,8 @@ def test_bqfl_streaming(self): .Method.FILE_LOADS, triggering_frequency=100)) + hamcrest_assert(p, bq_matcher) + @pytest.mark.it_postcommit def test_bqfl_streaming_with_copy_jobs(self): if isinstance(self.test_pipeline.runner, TestDataflowRunner): @@ -955,14 +956,12 @@ def test_bqfl_streaming_with_copy_jobs(self): schema = self.BIG_QUERY_STREAMING_SCHEMA l = [{'Integr': i} for i in range(_SIZE)] - state_matcher = PipelineStateMatcher(PipelineState.RUNNING) bq_matcher = BigqueryFullResultStreamingMatcher( project=self.project, query="SELECT Integr FROM %s" % output_table, data=[(i,) for i in range(100)]) args = self.test_pipeline.get_full_options_as_args( - on_success_matcher=all_of(state_matcher, bq_matcher), streaming=True, allow_unsafe_triggers=True) @@ -990,25 +989,32 @@ def test_bqfl_streaming_with_copy_jobs(self): .Method.FILE_LOADS, triggering_frequency=100)) + hamcrest_assert(p, bq_matcher) + @pytest.mark.it_postcommit def test_bqfl_streaming_with_dynamic_destinations(self): if isinstance(self.test_pipeline.runner, TestDataflowRunner): self.skipTest("TestStream is not supported on TestDataflowRunner") - self.output_table = "google.com:clouddfe:ahmedabualsaud_test.bqfl_stream_test" - prefix = self.output_table - output_table = lambda row: '%s_%s' % (prefix, f"dynamic_destination_{row['Integr'] % 2}") + # self.output_table = "ahmedabualsaud_test.bq_fl_stream_test" + even_table = '%s_%s' % (self.output_table, "dynamic_dest_0") + odd_table = '%s_%s' % (self.output_table, "dynamic_dest_1") + output_table = lambda row: even_table if (row['Integr'] % 2 == 0) else odd_table _SIZE = 100 schema = self.BIG_QUERY_STREAMING_SCHEMA l = [{'Integr': i} for i in range(_SIZE)] - state_matcher = PipelineStateMatcher(PipelineState.RUNNING) - bq_matcher = BigqueryFullResultStreamingMatcher( - project=self.project, - query="SELECT Integr FROM %s" % output_table, - data=[(i,) for i in range(90)]) + pipeline_verifiers = [ + BigqueryFullResultStreamingMatcher( + project=self.project, + query="SELECT Integr FROM %s" % even_table, + data=[(i,) for i in range(0, 100, 2)]), + BigqueryFullResultStreamingMatcher( + project=self.project, + query="SELECT Integr FROM %s" % odd_table, + data=[(i,) for i in range(1, 100, 2)]) + ] args = self.test_pipeline.get_full_options_as_args( - on_success_matcher=all_of(state_matcher, bq_matcher), streaming=True, allow_unsafe_triggers=True) @@ -1031,6 +1037,7 @@ def test_bqfl_streaming_with_dynamic_destinations(self): method=bigquery.WriteToBigQuery \ .Method.FILE_LOADS, triggering_frequency=100)) + hamcrest_assert(p, all_of(*pipeline_verifiers)) @pytest.mark.it_postcommit def test_one_job_fails_all_jobs_fail(self): From 66375633917ad938a39166c7a364ba17d4edac20 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 6 Sep 2022 18:14:27 +0000 Subject: [PATCH 06/10] style fixes --- .../apache_beam/io/gcp/bigquery_file_loads.py | 3 +- .../io/gcp/bigquery_file_loads_test.py | 68 +++++++++---------- 2 files changed, 34 insertions(+), 37 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index bbd750f8d24a..1fe1e652b3c2 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -454,7 +454,8 @@ def process(self, element, schema_mod_job_name_prefix): def finish_bundle(self): # Unlike the other steps, schema update is not always necessary. # In that case, return a None value to avoid blocking in streaming context. - # Otherwise, streaming pipeline would hang waiting for TriggerCopyJobs side-input. + # Otherwise, the streaming pipeline would get stuck waiting for the + # TriggerCopyJobs side-input. if not self.pending_jobs: return [GlobalWindows.windowed_value(None)] diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 34a80c64ee85..9ce1337d2558 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -44,8 +44,6 @@ from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner -from apache_beam.runners.runner import PipelineState -from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.test_stream import TestStream from apache_beam.testing.util import assert_that @@ -924,8 +922,7 @@ def test_bqfl_streaming(self): timeout=30) args = self.test_pipeline.get_full_options_as_args( - streaming=True, - allow_unsafe_triggers=True) + streaming=True, allow_unsafe_triggers=True) with beam.Pipeline(argv=args) as p: stream_source = ( TestStream().advance_watermark_to(0).advance_processing_time( @@ -957,13 +954,12 @@ def test_bqfl_streaming_with_copy_jobs(self): l = [{'Integr': i} for i in range(_SIZE)] bq_matcher = BigqueryFullResultStreamingMatcher( - project=self.project, - query="SELECT Integr FROM %s" % output_table, - data=[(i,) for i in range(100)]) + project=self.project, + query="SELECT Integr FROM %s" % output_table, + data=[(i, ) for i in range(100)]) args = self.test_pipeline.get_full_options_as_args( - streaming=True, - allow_unsafe_triggers=True) + streaming=True, allow_unsafe_triggers=True) # Override these parameters to induce copy jobs bqfl._DEFAULT_MAX_FILE_SIZE = 100 @@ -971,15 +967,15 @@ def test_bqfl_streaming_with_copy_jobs(self): with beam.Pipeline(argv=args) as p: stream_source = ( - TestStream().advance_watermark_to(0).advance_processing_time( - 100).add_elements(l[:_SIZE // 4]). + TestStream().advance_watermark_to(0).advance_processing_time( + 100).add_elements(l[:_SIZE // 4]). advance_processing_time(100).advance_watermark_to(100).add_elements( - l[_SIZE // 4:2 * _SIZE // 4]).advance_processing_time( - 100).advance_watermark_to(200).add_elements( - l[2 * _SIZE // 4:3 * _SIZE // 4]).advance_processing_time( - 100).advance_watermark_to(300).add_elements( - l[3 * _SIZE // 4:]).advance_processing_time( - 100).advance_watermark_to_infinity().advance_processing_time(100)) + l[_SIZE // 4:2 * _SIZE // 4]).advance_processing_time( + 100).advance_watermark_to(200).add_elements( + l[2 * _SIZE // 4:3 * _SIZE // 4]).advance_processing_time( + 100).advance_watermark_to(300).add_elements( + l[3 * _SIZE // 4:]).advance_processing_time(100). + advance_watermark_to_infinity().advance_processing_time(100)) _ = (p | stream_source @@ -998,37 +994,37 @@ def test_bqfl_streaming_with_dynamic_destinations(self): # self.output_table = "ahmedabualsaud_test.bq_fl_stream_test" even_table = '%s_%s' % (self.output_table, "dynamic_dest_0") odd_table = '%s_%s' % (self.output_table, "dynamic_dest_1") - output_table = lambda row: even_table if (row['Integr'] % 2 == 0) else odd_table + output_table = lambda row: even_table if ( + row['Integr'] % 2 == 0) else odd_table _SIZE = 100 schema = self.BIG_QUERY_STREAMING_SCHEMA l = [{'Integr': i} for i in range(_SIZE)] pipeline_verifiers = [ - BigqueryFullResultStreamingMatcher( - project=self.project, - query="SELECT Integr FROM %s" % even_table, - data=[(i,) for i in range(0, 100, 2)]), - BigqueryFullResultStreamingMatcher( - project=self.project, - query="SELECT Integr FROM %s" % odd_table, - data=[(i,) for i in range(1, 100, 2)]) + BigqueryFullResultStreamingMatcher( + project=self.project, + query="SELECT Integr FROM %s" % even_table, + data=[(i, ) for i in range(0, 100, 2)]), + BigqueryFullResultStreamingMatcher( + project=self.project, + query="SELECT Integr FROM %s" % odd_table, + data=[(i, ) for i in range(1, 100, 2)]) ] args = self.test_pipeline.get_full_options_as_args( - streaming=True, - allow_unsafe_triggers=True) + streaming=True, allow_unsafe_triggers=True) with beam.Pipeline(argv=args) as p: stream_source = ( - TestStream().advance_watermark_to(0).advance_processing_time( - 100).add_elements(l[:_SIZE // 4]). + TestStream().advance_watermark_to(0).advance_processing_time( + 100).add_elements(l[:_SIZE // 4]). advance_processing_time(100).advance_watermark_to(100).add_elements( - l[_SIZE // 4:2 * _SIZE // 4]).advance_processing_time( - 100).advance_watermark_to(200).add_elements( - l[2 * _SIZE // 4:3 * _SIZE // 4]).advance_processing_time( - 100).advance_watermark_to(300).add_elements( - l[3 * _SIZE // 4:]).advance_processing_time( - 100).advance_watermark_to_infinity().advance_processing_time(100)) + l[_SIZE // 4:2 * _SIZE // 4]).advance_processing_time( + 100).advance_watermark_to(200).add_elements( + l[2 * _SIZE // 4:3 * _SIZE // 4]).advance_processing_time( + 100).advance_watermark_to(300).add_elements( + l[3 * _SIZE // 4:]).advance_processing_time(100). + advance_watermark_to_infinity().advance_processing_time(100)) _ = (p | stream_source From 71b0403e3ec98bae9812e714ff761918f9cca42c Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 8 Sep 2022 18:42:55 +0000 Subject: [PATCH 07/10] fixing test_one_job... test --- .../io/gcp/bigquery_file_loads_test.py | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 9ce1337d2558..67b8e78ef186 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -577,12 +577,15 @@ def test_wait_for_job_completion(self, sleep_mock): sleep_mock.assert_called_once() @mock.patch('time.sleep') - def test_one_job_failed_after_waiting(self, sleep_mock): - job_references = [bigquery_api.JobReference(), bigquery_api.JobReference()] - job_references[0].projectId = 'project1' - job_references[0].jobId = 'jobId1' - job_references[1].projectId = 'project1' - job_references[1].jobId = 'jobId2' + def test_one_load_job_failed_after_waiting(self, sleep_mock): + job_1 = bigquery_api.Job() + job_1.jobReference = bigquery_api.JobReference() + job_1.jobReference.projectId = 'project1' + job_1.jobReference.jobId = 'jobId1' + job_2 = bigquery_api.Job() + job_2.jobReference = bigquery_api.JobReference() + job_2.jobReference.projectId = 'project1' + job_2.jobReference.jobId = 'jobId2' job_1_waiting = mock.Mock() job_1_waiting.status.state = 'RUNNING' @@ -598,16 +601,17 @@ def test_one_job_failed_after_waiting(self, sleep_mock): bq_client.jobs.Get.side_effect = [ job_1_waiting, job_2_done, job_1_error, job_2_done ] - - waiting_dofn = bqfl.WaitForBQJobs(bq_client) - - dest_list = [(i, job) for i, job in enumerate(job_references)] + partition_1 = ('project:dataset.table0', ['file0']) + partition_2 = ('project:dataset.table1', ['file1']) + bq_client.jobs.Insert.side_effect = [job_1, job_2] + test_job_prefix = "test_job" with self.assertRaises(Exception): with TestPipeline('DirectRunner') as p: - references = beam.pvalue.AsList(p | 'job_ref' >> beam.Create(dest_list)) - _ = (p | beam.Create(['']) | beam.ParDo(waiting_dofn, references)) - + partitions = p | beam.Create([partition_1, partition_2]) + _ = (partitions + | beam.ParDo(bqfl.TriggerLoadJobs(test_client=bq_client), test_job_prefix)) + sleep_mock.assert_called_once() def test_multiple_partition_files(self): From dae755fe2abe602d15448d108a14b6ad84482bb9 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 8 Sep 2022 19:31:54 +0000 Subject: [PATCH 08/10] fixing test_wait_for_job_completion --- .../io/gcp/bigquery_file_loads_test.py | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 67b8e78ef186..9172a13c46fd 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -542,12 +542,15 @@ def test_load_job_id_use_for_copy_job(self): label='CheckCopyJobProjectIds') @mock.patch('time.sleep') - def test_wait_for_job_completion(self, sleep_mock): - job_references = [bigquery_api.JobReference(), bigquery_api.JobReference()] - job_references[0].projectId = 'project1' - job_references[0].jobId = 'jobId1' - job_references[1].projectId = 'project1' - job_references[1].jobId = 'jobId2' + def test_wait_for_load_job_completion(self, sleep_mock): + job_1 = bigquery_api.Job() + job_1.jobReference = bigquery_api.JobReference() + job_1.jobReference.projectId = 'project1' + job_1.jobReference.jobId = 'jobId1' + job_2 = bigquery_api.Job() + job_2.jobReference = bigquery_api.JobReference() + job_2.jobReference.projectId = 'project1' + job_2.jobReference.jobId = 'jobId2' job_1_waiting = mock.Mock() job_1_waiting.status.state = 'RUNNING' @@ -563,16 +566,20 @@ def test_wait_for_job_completion(self, sleep_mock): bq_client.jobs.Get.side_effect = [ job_1_waiting, job_2_done, job_1_done, job_2_done ] + partition_1 = ('project:dataset.table0', ['file0']) + partition_2 = ('project:dataset.table1', ['file1']) + bq_client.jobs.Insert.side_effect = [job_1, job_2] + test_job_prefix = "test_job" - waiting_dofn = bqfl.WaitForBQJobs(bq_client) - - dest_list = [(i, job) for i, job in enumerate(job_references)] - + expected_dest_jobref_list = [(partition_1[0], job_1.jobReference), + (partition_2[0], job_2.jobReference)] with TestPipeline('DirectRunner') as p: - references = beam.pvalue.AsList(p | 'job_ref' >> beam.Create(dest_list)) - outputs = (p | beam.Create(['']) | beam.ParDo(waiting_dofn, references)) + partitions = p | beam.Create([partition_1, partition_2]) + outputs = (partitions + | beam.ParDo(bqfl.TriggerLoadJobs(test_client=bq_client), + test_job_prefix)) - assert_that(outputs, equal_to(dest_list)) + assert_that(outputs, equal_to(expected_dest_jobref_list)) sleep_mock.assert_called_once() @@ -610,8 +617,9 @@ def test_one_load_job_failed_after_waiting(self, sleep_mock): with TestPipeline('DirectRunner') as p: partitions = p | beam.Create([partition_1, partition_2]) _ = (partitions - | beam.ParDo(bqfl.TriggerLoadJobs(test_client=bq_client), test_job_prefix)) - + | beam.ParDo(bqfl.TriggerLoadJobs(test_client=bq_client), + test_job_prefix)) + sleep_mock.assert_called_once() def test_multiple_partition_files(self): From 50c50e9f53d2a4213b92c9c120e85cb939035cf2 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Fri, 9 Sep 2022 19:42:28 +0000 Subject: [PATCH 09/10] yield copy job references instead. having trouble returning and yielding to separate outputs in the same finish_bundle --- sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 3 +-- sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 1fe1e652b3c2..1e78a0e8e2f9 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -579,12 +579,11 @@ def finish_bundle(self): job_ref = windowed_value.value[1] self.bq_wrapper.wait_for_bq_job( job_ref, sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS) + yield windowed_value yield pvalue.TaggedOutput( TriggerCopyJobs.TRIGGER_DELETE_TEMP_TABLES, GlobalWindows.windowed_value(None)) - return self.pending_jobs - class TriggerLoadJobs(beam.DoFn): """Triggers the import jobs to BQ. diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 9172a13c46fd..016dc1d8bed3 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -1003,7 +1003,6 @@ def test_bqfl_streaming_with_copy_jobs(self): def test_bqfl_streaming_with_dynamic_destinations(self): if isinstance(self.test_pipeline.runner, TestDataflowRunner): self.skipTest("TestStream is not supported on TestDataflowRunner") - # self.output_table = "ahmedabualsaud_test.bq_fl_stream_test" even_table = '%s_%s' % (self.output_table, "dynamic_dest_0") odd_table = '%s_%s' % (self.output_table, "dynamic_dest_1") output_table = lambda row: even_table if ( From 1e39863fa4ec84f19b3fd5b07806e6bfbf512c72 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 12 Sep 2022 03:45:03 +0000 Subject: [PATCH 10/10] style fixes --- .../apache_beam/io/gcp/bigquery_file_loads.py | 1 + .../apache_beam/io/gcp/bigquery_file_loads_test.py | 14 ++++++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 1e78a0e8e2f9..1e964c051bd1 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -585,6 +585,7 @@ def finish_bundle(self): TriggerCopyJobs.TRIGGER_DELETE_TEMP_TABLES, GlobalWindows.windowed_value(None)) + class TriggerLoadJobs(beam.DoFn): """Triggers the import jobs to BQ. diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 016dc1d8bed3..77fb554bedc6 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -575,9 +575,10 @@ def test_wait_for_load_job_completion(self, sleep_mock): (partition_2[0], job_2.jobReference)] with TestPipeline('DirectRunner') as p: partitions = p | beam.Create([partition_1, partition_2]) - outputs = (partitions - | beam.ParDo(bqfl.TriggerLoadJobs(test_client=bq_client), - test_job_prefix)) + outputs = ( + partitions + | beam.ParDo( + bqfl.TriggerLoadJobs(test_client=bq_client), test_job_prefix)) assert_that(outputs, equal_to(expected_dest_jobref_list)) @@ -616,9 +617,10 @@ def test_one_load_job_failed_after_waiting(self, sleep_mock): with self.assertRaises(Exception): with TestPipeline('DirectRunner') as p: partitions = p | beam.Create([partition_1, partition_2]) - _ = (partitions - | beam.ParDo(bqfl.TriggerLoadJobs(test_client=bq_client), - test_job_prefix)) + _ = ( + partitions + | beam.ParDo( + bqfl.TriggerLoadJobs(test_client=bq_client), test_job_prefix)) sleep_mock.assert_called_once()