Skip to content

Commit

Permalink
Fix truncate copyjob when WRITE_TRUNCATE in BigQuery batch load
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn committed Jan 20, 2023
1 parent 148730e commit 15556f0
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 2 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@

* Avoids Cassandra syntax error when user-defined query has no where clause in it (Java) ([#24829](https://github.com/apache/beam/issues/24829)).
* Fixed JDBC connection failures (Java) during handshake due to deprecated TLSv1(.1) protocol for the JDK. ([#24623](https://github.com/apache/beam/issues/24623))
* Fixed Python BigQuery Batch Load write may truncate valid data when deposition sets to WRITE_TRUNCATE and incoming data is large (Python) ([#24623](https://github.com/apache/beam/issues/24535)).

## Known Issues

* ([#X](https://github.com/apache/beam/issues/X)).

# [2.44.0] - Unreleased
# [2.44.0] - 2023-01-12

## Highlights

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ def _load_data(
load_job_project_id=self.load_job_project_id),
schema_mod_job_name_pcv))

if self.create_disposition == 'WRITE_TRUNCATE':
if self.write_disposition == 'WRITE_TRUNCATE':
# All loads going to the same table must be processed together so that
# the truncation happens only once. See BEAM-24535.
finished_temp_tables_load_job_ids_list_pc = (
Expand Down
43 changes: 43 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,49 @@ def test_multiple_partition_files(self):
equal_to([6]),
label='CheckCopyJobCount')

@mock.patch(
'apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.process',
wraps=lambda *x: None)
def test_multiple_partition_files_write_truncate(self, mock_call_process):
destination = 'project1:dataset1.table1'

job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
result_job = mock.Mock()
result_job.jobReference = job_reference

mock_job = mock.Mock()
mock_job.status.state = 'DONE'
mock_job.status.errorResult = None
mock_job.jobReference = job_reference

bq_client = mock.Mock()
bq_client.jobs.Get.return_value = mock_job

bq_client.jobs.Insert.return_value = result_job
bq_client.tables.Delete.return_value = None

mock_call_process

with TestPipeline('DirectRunner') as p:
_ = (
p
| beam.Create(_ELEMENTS, reshuffle=False)
| bqfl.BigQueryBatchFileLoads(
destination,
custom_gcs_temp_location=self._new_tempdir(),
test_client=bq_client,
validate=False,
temp_file_format=bigquery_tools.FileFormat.JSON,
max_file_size=45,
max_partition_size=80,
max_files_per_partition=2,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

# TriggerCopyJob only processes once
self.assertEqual(mock_call_process.call_count, 1)

@parameterized.expand([
param(is_streaming=False, with_auto_sharding=False),
param(is_streaming=True, with_auto_sharding=False),
Expand Down

0 comments on commit 15556f0

Please sign in to comment.