Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix truncate copy job when WRITE_TRUNCATE in BigQuery batch load #25101

Merged
merged 2 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@

* Avoids Cassandra syntax error when user-defined query has no where clause in it (Java) ([#24829](https://github.com/apache/beam/issues/24829)).
* Fixed JDBC connection failures (Java) during handshake due to deprecated TLSv1(.1) protocol for the JDK. ([#24623](https://github.com/apache/beam/issues/24623))
* Fixed Python BigQuery Batch Load write may truncate valid data when deposition sets to WRITE_TRUNCATE and incoming data is large (Python) ([#24623](https://github.com/apache/beam/issues/24535)).

## Known Issues

* ([#X](https://github.com/apache/beam/issues/X)).

# [2.44.0] - Unreleased
# [2.44.0] - 2023-01-12

## Highlights

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ def _load_data(
load_job_project_id=self.load_job_project_id),
schema_mod_job_name_pcv))

if self.create_disposition == 'WRITE_TRUNCATE':
if self.write_disposition == 'WRITE_TRUNCATE':
# All loads going to the same table must be processed together so that
# the truncation happens only once. See BEAM-24535.
finished_temp_tables_load_job_ids_list_pc = (
Expand Down
43 changes: 43 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,49 @@ def test_multiple_partition_files(self):
equal_to([6]),
label='CheckCopyJobCount')

@mock.patch(
'apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.process',
wraps=lambda *x: None)
def test_multiple_partition_files_write_truncate(self, mock_call_process):
destination = 'project1:dataset1.table1'

job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
result_job = mock.Mock()
result_job.jobReference = job_reference

mock_job = mock.Mock()
mock_job.status.state = 'DONE'
mock_job.status.errorResult = None
mock_job.jobReference = job_reference

bq_client = mock.Mock()
bq_client.jobs.Get.return_value = mock_job

bq_client.jobs.Insert.return_value = result_job
bq_client.tables.Delete.return_value = None

mock_call_process

with TestPipeline('DirectRunner') as p:
_ = (
p
| beam.Create(_ELEMENTS, reshuffle=False)
| bqfl.BigQueryBatchFileLoads(
destination,
custom_gcs_temp_location=self._new_tempdir(),
test_client=bq_client,
validate=False,
temp_file_format=bigquery_tools.FileFormat.JSON,
max_file_size=45,
max_partition_size=80,
max_files_per_partition=2,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

# TriggerCopyJob only processes once
self.assertEqual(mock_call_process.call_count, 1)

@parameterized.expand([
param(is_streaming=False, with_auto_sharding=False),
param(is_streaming=True, with_auto_sharding=False),
Expand Down