Skip to content

Commit

Permalink
Merge pull request apache#17365 from [BEAM-12482] Update Schema Desti…
Browse files Browse the repository at this point in the history
…nation during Bigquery load job when using temporary tables using zeroloadjob

* Added a zero row job to bigquery writeTable

* added unit test

* add sdf for update schema destination

* add loadjob with inputstreamcontent parameter

* add updateschemadest with zeroloadjob before copying to dest

* add test for write temp tables

* add nullness supress warning

* remove unnecesary variable and use global variable for updateschemadestination-dofn

Co-authored-by: Miguel Anzo <[email protected]>
  • Loading branch information
MarcoRob and MiguelAnzoWizeline authored May 13, 2022
1 parent 2d57753 commit 0a2aed7
Show file tree
Hide file tree
Showing 7 changed files with 531 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
PCollection<KV<TableDestination, WriteTables.Result>> tempTables =
writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView);

List<PCollectionView<?>> sideInputsForUpdateSchema =
Lists.newArrayList(tempLoadJobIdPrefixView);
sideInputsForUpdateSchema.addAll(dynamicDestinations.getSideInputs());

PCollection<TableDestination> successfulMultiPartitionWrites =
tempTables
// Now that the load job has happened, we want the rename to happen immediately.
Expand All @@ -368,6 +372,22 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
.setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder()))
.apply("GroupByKey", GroupByKey.create())
.apply("Extract Values", Values.create())
.apply(
ParDo.of(
new UpdateSchemaDestination(
bigQueryServices,
tempLoadJobIdPrefixView,
loadJobProjectId,
WriteDisposition.WRITE_APPEND,
CreateDisposition.CREATE_NEVER,
maxRetryJobs,
ignoreUnknownValues,
kmsKey,
rowWriterFactory.getSourceFormat(),
useAvroLogicalTypes,
schemaUpdateOptions,
dynamicDestinations))
.withSideInputs(sideInputsForUpdateSchema))
.apply(
"WriteRenameTriggered",
ParDo.of(
Expand Down Expand Up @@ -444,9 +464,29 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
PCollection<TableDestination> successfulSinglePartitionWrites =
writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView);

List<PCollectionView<?>> sideInputsForUpdateSchema =
Lists.newArrayList(tempLoadJobIdPrefixView);
sideInputsForUpdateSchema.addAll(dynamicDestinations.getSideInputs());

PCollection<TableDestination> successfulMultiPartitionWrites =
writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView)
.apply("ReifyRenameInput", new ReifyAsIterable<>())
.apply(
ParDo.of(
new UpdateSchemaDestination(
bigQueryServices,
tempLoadJobIdPrefixView,
loadJobProjectId,
WriteDisposition.WRITE_APPEND,
CreateDisposition.CREATE_NEVER,
maxRetryJobs,
ignoreUnknownValues,
kmsKey,
rowWriterFactory.getSourceFormat(),
useAvroLogicalTypes,
schemaUpdateOptions,
dynamicDestinations))
.withSideInputs(sideInputsForUpdateSchema))
.apply(
"WriteRenameUntriggered",
ParDo.of(
Expand Down Expand Up @@ -679,17 +719,6 @@ private PCollection<KV<TableDestination, WriteTables.Result>> writeTempTables(
ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
WritePartition.ResultCoder.INSTANCE);

// If the final destination table exists already (and we're appending to it), then the temp
// tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object
// with one that makes this happen.
@SuppressWarnings("unchecked")
DynamicDestinations<?, DestinationT> destinations = dynamicDestinations;
if (createDisposition.equals(CreateDisposition.CREATE_IF_NEEDED)
|| createDisposition.equals(CreateDisposition.CREATE_NEVER)) {
destinations =
DynamicDestinationsHelpers.matchTableDynamicDestinations(destinations, bigQueryServices);
}

Coder<TableDestination> tableDestinationCoder =
clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of();

Expand All @@ -711,7 +740,7 @@ private PCollection<KV<TableDestination, WriteTables.Result>> writeTempTables(
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
sideInputs,
destinations,
dynamicDestinations,
loadJobProjectId,
maxRetryJobs,
ignoreUnknownValues,
Expand All @@ -720,7 +749,7 @@ private PCollection<KV<TableDestination, WriteTables.Result>> writeTempTables(
useAvroLogicalTypes,
// Note that we can't pass through the schema update options when creating temporary
// tables. They also shouldn't be needed. See BEAM-12482 for additional details.
Collections.emptySet(),
schemaUpdateOptions,
tempDataset))
.setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.client.http.AbstractInputStreamContent;
import com.google.api.core.ApiFuture;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.Job;
Expand Down Expand Up @@ -68,6 +69,14 @@ public interface JobService extends AutoCloseable {
/** Start a BigQuery load job. */
void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
throws InterruptedException, IOException;

/** Start a BigQuery load job with stream content. */
void startLoadJob(
JobReference jobRef,
JobConfigurationLoad loadConfig,
AbstractInputStreamContent streamContent)
throws InterruptedException, IOException;

/** Start a BigQuery extract job. */
void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
throws InterruptedException, IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
import com.google.api.client.http.AbstractInputStreamContent;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
Expand Down Expand Up @@ -236,6 +237,28 @@ public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
startJob(job, errorExtractor, client);
}

/**
* {@inheritDoc}
*
* <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
*
* @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
*/
@Override
public void startLoadJob(
JobReference jobRef, JobConfigurationLoad loadConfig, AbstractInputStreamContent stream)
throws InterruptedException, IOException {
Map<String, String> labelMap = new HashMap<>();
Job job =
new Job()
.setJobReference(jobRef)
.setConfiguration(
new JobConfiguration()
.setLoad(loadConfig)
.setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
startJobStream(job, stream, errorExtractor, client, Sleeper.DEFAULT, createDefaultBackoff());
}

/**
* {@inheritDoc}
*
Expand Down Expand Up @@ -338,6 +361,47 @@ static void startJob(
lastException);
}

static void startJobStream(
Job job,
AbstractInputStreamContent streamContent,
ApiErrorExtractor errorExtractor,
Bigquery client,
Sleeper sleeper,
BackOff backOff)
throws IOException, InterruptedException {
JobReference jobReference = job.getJobReference();
Exception exception;
do {
try {
client
.jobs()
.insert(jobReference.getProjectId(), job, streamContent)
.setPrettyPrint(false)
.execute();
LOG.info(
"Started BigQuery job: {}.\n{}",
jobReference,
formatBqStatusCommand(jobReference.getProjectId(), jobReference.getJobId()));
return;
} catch (IOException e) {
if (errorExtractor.itemAlreadyExists(e)) {
LOG.info(
"BigQuery job " + jobReference + " already exists, will not retry inserting it:",
e);
return; // SUCCEEDED
}
// ignore and retry
LOG.info("Failed to insert job " + jobReference + ", will retry:", e);
exception = e;
}
} while (nextBackOff(sleeper, backOff));
throw new IOException(
String.format(
"Unable to insert job: %s, aborting after %d .",
jobReference.getJobId(), MAX_RPC_RETRIES),
exception);
}

@Override
public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException {
BackOff backoff =
Expand Down
Loading

0 comments on commit 0a2aed7

Please sign in to comment.