Skip to content

Commit

Permalink
Fix WRITE_EMPTY disposition fail duplicate copy job (#26608)
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn authored May 15, 2023
1 parent 5df8c9e commit edde37f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
9 changes: 6 additions & 3 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,10 @@ def display_data(self):
True, label="This Dataflow job launches bigquery jobs.")
}

def start_bundle(self):
def setup(self):
self._observed_tables = set()

def start_bundle(self):
self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
if not self.bq_io_metadata:
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
Expand Down Expand Up @@ -1093,9 +1095,10 @@ def _load_data(
load_job_project_id=self.load_job_project_id),
schema_mod_job_name_pcv))

if self.write_disposition == 'WRITE_TRUNCATE':
if self.write_disposition in ('WRITE_EMPTY', 'WRITE_TRUNCATE'):
# All loads going to the same table must be processed together so that
# the truncation happens only once. See BEAM-24535.
# the truncation happens only once. See
# https://github.com/apache/beam/issues/24535.
finished_temp_tables_load_job_ids_list_pc = (
finished_temp_tables_load_job_ids_pc | beam.MapTuple(
lambda destination,
Expand Down
11 changes: 8 additions & 3 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from apache_beam.io.gcp import bigquery_file_loads as bqfl
from apache_beam.io.gcp import bigquery
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery import BigQueryDisposition
from apache_beam.io.gcp.internal.clients import bigquery as bigquery_api
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher
Expand Down Expand Up @@ -715,10 +716,15 @@ def test_multiple_partition_files(self):
equal_to([6]),
label='CheckCopyJobCount')

@parameterized.expand([
param(write_disposition=BigQueryDisposition.WRITE_TRUNCATE),
param(write_disposition=BigQueryDisposition.WRITE_EMPTY)
])
@mock.patch(
'apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.process',
wraps=lambda *x: None)
def test_multiple_partition_files_write_truncate(self, mock_call_process):
def test_multiple_partition_files_write_dispositions(
self, mock_call_process, write_disposition):
destination = 'project1:dataset1.table1'

job_reference = bigquery_api.JobReference()
Expand Down Expand Up @@ -751,8 +757,7 @@ def test_multiple_partition_files_write_truncate(self, mock_call_process):
max_file_size=45,
max_partition_size=80,
max_files_per_partition=2,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

write_disposition=write_disposition))
# TriggerCopyJob only processes once
self.assertEqual(mock_call_process.call_count, 1)

Expand Down

0 comments on commit edde37f

Please sign in to comment.