Skip to content

Commit

Permalink
split write rename and temp table deletion into to dofns with a shuff…
Browse files Browse the repository at this point in the history
…le (#30023)

* split write rename and temp table deletion into to dofns with a shuffle

* fix tests

* spotless + checkstyle

* remove unserializable data being passed between write-rename and table cleanup

* Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java

Co-authored-by: Ahmed Abualsaud <[email protected]>

* address comments

* spotkess

---------

Co-authored-by: Ahmed Abualsaud <[email protected]>
  • Loading branch information
johnjcasey and ahmedabu98 authored Jan 22, 2024
1 parent d014a98 commit 173d834
Show file tree
Hide file tree
Showing 4 changed files with 392 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -419,16 +419,15 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
.withSideInputs(sideInputsForUpdateSchema))
.apply(
"WriteRenameTriggered",
ParDo.of(
new WriteRename(
bigQueryServices,
copyJobIdPrefixView,
writeDisposition,
createDisposition,
maxRetryJobs,
kmsKey,
loadJobProjectId))
.withSideInputs(copyJobIdPrefixView));
new WriteRename(
bigQueryServices,
copyJobIdPrefixView,
writeDisposition,
createDisposition,
maxRetryJobs,
kmsKey,
loadJobProjectId,
copyJobIdPrefixView));

PCollection<TableDestination> successfulSinglePartitionWrites =
writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView)
Expand Down Expand Up @@ -517,16 +516,15 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
.withSideInputs(sideInputsForUpdateSchema))
.apply(
"WriteRenameUntriggered",
ParDo.of(
new WriteRename(
bigQueryServices,
copyJobIdPrefixView,
writeDisposition,
createDisposition,
maxRetryJobs,
kmsKey,
loadJobProjectId))
.withSideInputs(copyJobIdPrefixView))
new WriteRename(
bigQueryServices,
copyJobIdPrefixView,
writeDisposition,
createDisposition,
maxRetryJobs,
kmsKey,
loadJobProjectId,
copyJobIdPrefixView))
.setCoder(tableDestinationCoder);

PCollectionList<TableDestination> allSuccessfulWrites =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -168,7 +169,7 @@ private static boolean nextBackOff(Sleeper sleeper, BackOff backOff)
}
}

static class PendingJob {
static class PendingJob implements Serializable {
private final SerializableFunction<RetryJobId, Void> executeJob;
private final SerializableFunction<RetryJobId, Job> pollJob;
private final SerializableFunction<RetryJobId, Job> lookupJob;
Expand Down Expand Up @@ -275,7 +276,7 @@ boolean shouldRetry() {
}
}

static class RetryJobId {
static class RetryJobId implements Serializable {
private final String jobIdPrefix;
private final int retryIndex;

Expand Down
Loading

0 comments on commit 173d834

Please sign in to comment.