diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index ada76e1ef484..ec3ba90e02ba 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -716,6 +716,18 @@ private PCollection> writeTempTables( ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), WritePartition.ResultCoder.INSTANCE); + // If the final destination table exists already (and we're appending to it), then the temp + // tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object + // with one that makes this happen. + // In the case schemaUpdateOptions are specified by the user, matching does not occur in order + // to respect those options. + DynamicDestinations destinations = dynamicDestinations; + if (schemaUpdateOptions.isEmpty()) { + destinations = + DynamicDestinationsHelpers.matchTableDynamicDestinations( + dynamicDestinations, bigQueryServices); + } + // If WriteBundlesToFiles produced more than DEFAULT_MAX_FILES_PER_PARTITION files or // DEFAULT_MAX_BYTES_PER_PARTITION bytes, then // the import needs to be split into multiple partitions, and those partitions will be @@ -734,7 +746,7 @@ private PCollection> writeTempTables( WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, sideInputs, - dynamicDestinations, + destinations, loadJobProjectId, maxRetryJobs, ignoreUnknownValues,