diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index ab7df6f081..f4094e5274 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -16,14 +16,19 @@ package com.google.cloud.dataflow.sdk.io; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.api.client.json.JsonFactory; import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.QueryRequest; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.dataflow.sdk.coders.AtomicCoder; import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.Coder.Context; import com.google.cloud.dataflow.sdk.coders.CoderException; import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.coders.StandardCoder; @@ -31,10 +36,12 @@ import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder; import com.google.cloud.dataflow.sdk.coders.VarIntCoder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; +import com.google.cloud.dataflow.sdk.io.BigQueryIO.BigQuerySink; import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition; import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition; import com.google.cloud.dataflow.sdk.options.BigQueryOptions; import com.google.cloud.dataflow.sdk.options.GcpOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; import com.google.cloud.dataflow.sdk.transforms.Aggregator; @@ -44,22 +51,29 @@ import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.dataflow.sdk.transforms.Sum; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.util.BigQueryServices; +import com.google.cloud.dataflow.sdk.util.BigQueryServices.LoadService; +import com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl; import com.google.cloud.dataflow.sdk.util.BigQueryTableInserter; import com.google.cloud.dataflow.sdk.util.BigQueryTableRowIterator; +import com.google.cloud.dataflow.sdk.util.MimeTypes; import com.google.cloud.dataflow.sdk.util.PropertyNames; import com.google.cloud.dataflow.sdk.util.Reshuffle; import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal; import com.google.cloud.dataflow.sdk.util.Transport; -import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded; import com.google.cloud.dataflow.sdk.values.PDone; import com.google.cloud.dataflow.sdk.values.PInput; import com.google.cloud.hadoop.util.ApiErrorExtractor; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -70,6 +84,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -724,12 +741,12 @@ public static Bound withoutValidation() { * {@link PCollection} of {@link TableRow TableRows} to a BigQuery table. */ public static class Bound extends PTransform, PDone> { - final TableReference table; + @Nullable final String jsonTableRef; - final SerializableFunction tableRefFunction; + @Nullable final SerializableFunction tableRefFunction; // Table schema. The schema is required only if the table does not exist. - final TableSchema schema; + @Nullable final String jsonSchema; // Options for creating the table. Valid values are CREATE_IF_NEEDED and // CREATE_NEVER. @@ -742,6 +759,9 @@ public static class Bound extends PTransform, PDone> { // An option to indicate if table validation is desired. Default is true. final boolean validate; + // A fake or mock BigQueryServices for tests. + @Nullable private BigQueryServices testBigQueryServices; + private static class TranslateTableSpecFunction implements SerializableFunction { private SerializableFunction tableSpecFunction; @@ -764,20 +784,22 @@ public TableReference apply(BoundedWindow value) { @Deprecated public Bound() { this(null, null, null, null, CreateDisposition.CREATE_IF_NEEDED, - WriteDisposition.WRITE_EMPTY, true); + WriteDisposition.WRITE_EMPTY, true, null); } - private Bound(String name, TableReference ref, - SerializableFunction tableRefFunction, TableSchema schema, - CreateDisposition createDisposition, WriteDisposition writeDisposition, - boolean validate) { + private Bound(String name, @Nullable String jsonTableRef, + @Nullable SerializableFunction tableRefFunction, + @Nullable String jsonSchema, + CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate, + @Nullable BigQueryServices testBigQueryServices) { super(name); - this.table = ref; + this.jsonTableRef = jsonTableRef; this.tableRefFunction = tableRefFunction; - this.schema = schema; - this.createDisposition = createDisposition; - this.writeDisposition = writeDisposition; + this.jsonSchema = jsonSchema; + this.createDisposition = checkNotNull(createDisposition, "createDisposition"); + this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition"); this.validate = validate; + this.testBigQueryServices = testBigQueryServices; } /** @@ -786,8 +808,8 @@ private Bound(String name, TableReference ref, *

Does not modify this object. */ public Bound named(String name) { - return new Bound(name, table, tableRefFunction, schema, createDisposition, - writeDisposition, validate); + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, validate, testBigQueryServices); } /** @@ -806,8 +828,8 @@ public Bound to(String tableSpec) { *

Does not modify this object. */ public Bound to(TableReference table) { - return new Bound(name, table, tableRefFunction, schema, createDisposition, - writeDisposition, validate); + return new Bound(name, toJsonString(table), tableRefFunction, jsonSchema, createDisposition, + writeDisposition, validate, testBigQueryServices); } /** @@ -835,8 +857,8 @@ public Bound to( */ public Bound toTableReference( SerializableFunction tableRefFunction) { - return new Bound(name, table, tableRefFunction, schema, createDisposition, - writeDisposition, validate); + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, validate, testBigQueryServices); } /** @@ -846,8 +868,8 @@ public Bound toTableReference( *

Does not modify this object. */ public Bound withSchema(TableSchema schema) { - return new Bound(name, table, tableRefFunction, schema, createDisposition, - writeDisposition, validate); + return new Bound(name, jsonTableRef, tableRefFunction, toJsonString(schema), + createDisposition, writeDisposition, validate, testBigQueryServices); } /** @@ -856,8 +878,8 @@ public Bound withSchema(TableSchema schema) { *

Does not modify this object. */ public Bound withCreateDisposition(CreateDisposition createDisposition) { - return new Bound(name, table, tableRefFunction, schema, createDisposition, - writeDisposition, validate); + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, validate, testBigQueryServices); } /** @@ -866,8 +888,8 @@ public Bound withCreateDisposition(CreateDisposition createDisposition) { *

Does not modify this object. */ public Bound withWriteDisposition(WriteDisposition writeDisposition) { - return new Bound(name, table, tableRefFunction, schema, createDisposition, - writeDisposition, validate); + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, validate, testBigQueryServices); } /** @@ -876,8 +898,14 @@ public Bound withWriteDisposition(WriteDisposition writeDisposition) { *

Does not modify this object. */ public Bound withoutValidation() { - return new Bound(name, table, tableRefFunction, schema, createDisposition, - writeDisposition, false); + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, false, testBigQueryServices); + } + + @VisibleForTesting + Bound withTestServices(BigQueryServices testServices) { + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, validate, testServices); } private static void verifyTableEmpty( @@ -906,6 +934,7 @@ private static void verifyTableEmpty( public PDone apply(PCollection input) { BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); + TableReference table = getTable(); if (table == null && tableRefFunction == null) { throw new IllegalStateException( "must set the table reference of a BigQueryIO.Write transform"); @@ -916,7 +945,7 @@ public PDone apply(PCollection input) { + "transform"); } - if (createDisposition == CreateDisposition.CREATE_IF_NEEDED && schema == null) { + if (createDisposition == CreateDisposition.CREATE_IF_NEEDED && jsonSchema == null) { throw new IllegalArgumentException("CreateDisposition is CREATE_IF_NEEDED, " + "however no schema was provided."); } @@ -935,7 +964,7 @@ public PDone apply(PCollection input) { // of the pipeline. For these cases the withoutValidation method can be used to disable // the check. // Unfortunately we can't validate anything early in case tableRefFunction is specified. - if (table != null && validate) { + if (jsonTableRef != null && validate) { verifyDatasetPresence(options, table); if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) { verifyTablePresence(options, table); @@ -958,10 +987,34 @@ public PDone apply(PCollection input) { + "supported for unbounded PCollections or when using tablespec functions."); } - return input.apply(new StreamWithDeDup(table, tableRefFunction, schema)); + return input.apply(new StreamWithDeDup(table, tableRefFunction, getSchema())); } - return PDone.in(input.getPipeline()); + String tempLocation = options.getTempLocation(); + checkArgument(!Strings.isNullOrEmpty(tempLocation), + "BigQueryIO.Write needs a GCS temp location to store temp files."); + if (testBigQueryServices == null) { + try { + GcsPath.fromUri(tempLocation); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(String.format( + "BigQuery temp location expected a valid 'gs://' path, but was given '%s'", + tempLocation), e); + } + } + String jobIdToken = UUID.randomUUID().toString(); + String tempFilePrefix = tempLocation + "/BigQuerySinkTemp/" + jobIdToken; + BigQueryServices bqServices = getBigQueryServices(); + return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to( + new BigQuerySink( + jobIdToken, + toJsonString(table), + jsonSchema, + getWriteDisposition(), + getCreateDisposition(), + tempFilePrefix, + input.getCoder(), + bqServices))); } @Override @@ -969,17 +1022,6 @@ protected Coder getDefaultOutputCoder() { return VoidCoder.of(); } - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - Bound.class, new DirectPipelineRunner.TransformEvaluator() { - @Override - public void evaluate( - Bound transform, DirectPipelineRunner.EvaluationContext context) { - evaluateWriteHelper(transform, context); - } - }); - } - /** Returns the create disposition. */ public CreateDisposition getCreateDisposition() { return createDisposition; @@ -992,24 +1034,191 @@ public WriteDisposition getWriteDisposition() { /** Returns the table schema. */ public TableSchema getSchema() { - return schema; + return fromJsonString(jsonSchema, TableSchema.class); } /** Returns the table reference, or {@code null} if a . */ public TableReference getTable() { - return table; + return fromJsonString(jsonTableRef, TableReference.class); } /** Returns {@code true} if table validation is enabled. */ public boolean getValidate() { return validate; } + + private BigQueryServices getBigQueryServices() { + if (testBigQueryServices != null) { + return testBigQueryServices; + } else { + return new BigQueryServicesImpl(); + } + } } /** Disallow construction of utility class. */ private Write() {} } + /** + * {@link BigQuerySink} is implemented as a {@link FileBasedSink}. + * + *

It uses BigQuery load job to import files into BigQuery. + */ + static class BigQuerySink extends FileBasedSink { + private final String jobIdToken; + @Nullable private final String jsonTable; + @Nullable private final String jsonSchema; + private final WriteDisposition writeDisposition; + private final CreateDisposition createDisposition; + private final Coder coder; + private final BigQueryServices bqServices; + + public BigQuerySink( + String jobIdToken, + @Nullable String jsonTable, + @Nullable String jsonSchema, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + String tempFile, + Coder coder, + BigQueryServices bqServices) { + super(tempFile, ".json"); + this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken"); + this.jsonTable = jsonTable; + this.jsonSchema = jsonSchema; + this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition"); + this.createDisposition = checkNotNull(createDisposition, "createDisposition"); + this.coder = checkNotNull(coder, "coder"); + this.bqServices = checkNotNull(bqServices, "bqServices"); + } + + @Override + public FileBasedSink.FileBasedWriteOperation createWriteOperation( + PipelineOptions options) { + return new BigQueryWriteOperation(this); + } + + private static class BigQueryWriteOperation extends FileBasedWriteOperation { + // The maximum number of retry load jobs. + private static final int MAX_RETRY_LOAD_JOBS = 3; + + private final BigQuerySink bigQuerySink; + + private BigQueryWriteOperation(BigQuerySink sink) { + super(checkNotNull(sink, "sink")); + this.bigQuerySink = sink; + } + + @Override + public FileBasedWriter createWriter(PipelineOptions options) throws Exception { + return new TableRowWriter(this, bigQuerySink.coder); + } + + @Override + public void finalize(Iterable writerResults, PipelineOptions options) + throws IOException, InterruptedException { + try { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + List tempFiles = Lists.newArrayList(); + for (FileResult result : writerResults) { + tempFiles.add(result.getFilename()); + } + if (!tempFiles.isEmpty()) { + load( + bigQuerySink.bqServices.getLoadService(bqOptions), + bigQuerySink.jobIdToken, + fromJsonString(bigQuerySink.jsonTable, TableReference.class), + tempFiles, + fromJsonString(bigQuerySink.jsonSchema, TableSchema.class), + bigQuerySink.writeDisposition, + bigQuerySink.createDisposition); + } + } finally { + removeTemporaryFiles(options); + } + } + + /** + * Import files into BigQuery with load jobs. + * + *

Returns if files are successfully loaded into BigQuery. + * Throws a RuntimeException if: + * 1. The status of one load job is UNKNOWN. This is to avoid duplicating data. + * 2. It exceeds {@code MAX_RETRY_LOAD_JOBS}. + * + *

If a load job failed, it will try another load job with a different job id. + */ + private void load( + LoadService loadService, + String jobIdPrefix, + TableReference ref, + List gcsUris, + @Nullable TableSchema schema, + WriteDisposition writeDisposition, + CreateDisposition createDisposition) throws InterruptedException, IOException { + JobConfigurationLoad loadConfig = new JobConfigurationLoad(); + loadConfig.setSourceUris(gcsUris); + loadConfig.setDestinationTable(ref); + loadConfig.setSchema(schema); + loadConfig.setWriteDisposition(writeDisposition.name()); + loadConfig.setCreateDisposition(createDisposition.name()); + loadConfig.setSourceFormat("NEWLINE_DELIMITED_JSON"); + + boolean retrying = false; + String projectId = ref.getProjectId(); + for (int i = 0; i < MAX_RETRY_LOAD_JOBS; ++i) { + String jobId = jobIdPrefix + "-" + i; + if (retrying) { + LOG.info("Previous load jobs failed, retrying."); + } + LOG.info("Starting BigQuery load job: {}", jobId); + loadService.startLoadJob(jobId, loadConfig); + BigQueryServices.Status jobStatus = loadService.pollJobStatus(projectId, jobId); + switch (jobStatus) { + case SUCCEEDED: + return; + case UNKNOWN: + throw new RuntimeException("Failed to poll the load job status."); + case FAILED: + LOG.info("BigQuery load job failed: {}", jobId); + retrying = true; + continue; + default: + throw new IllegalStateException("Unexpected job status: " + jobStatus); + } + } + throw new RuntimeException( + "Failed to create the load job, reached max retries: " + MAX_RETRY_LOAD_JOBS); + } + } + + private static class TableRowWriter extends FileBasedWriter { + private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); + private final Coder coder; + private OutputStream out; + + public TableRowWriter( + FileBasedWriteOperation writeOperation, Coder coder) { + super(writeOperation); + this.mimeType = MimeTypes.TEXT; + this.coder = coder; + } + + @Override + protected void prepareWrite(WritableByteChannel channel) throws Exception { + out = Channels.newOutputStream(channel); + } + + @Override + public void write(TableRow value) throws Exception { + // Use Context.OUTER to encode and NEWLINE as the delimeter. + coder.encode(value, out, Context.OUTER); + out.write(NEWLINE); + } + } + } + private static void verifyDatasetPresence(BigQueryOptions options, TableReference table) { try { Bigquery client = Transport.newBigQueryClient(options).build(); @@ -1400,6 +1609,32 @@ public PDone apply(PCollection input) { } } + private static String toJsonString(Object item) { + if (item == null) { + return null; + } + try { + return JSON_FACTORY.toString(item); + } catch (IOException e) { + throw new RuntimeException( + String.format("Cannot serialize %s to a JSON string.", item.getClass().getSimpleName()), + e); + } + } + + private static T fromJsonString(String json, Class clazz) { + if (json == null) { + return null; + } + try { + return JSON_FACTORY.fromString(json, clazz); + } catch (IOException e) { + throw new RuntimeException( + String.format("Cannot deserialize %s from a JSON string: %s.", clazz, json), + e); + } + } + ///////////////////////////////////////////////////////////////////////////// /** Disallow construction of utility class. */ @@ -1450,50 +1685,4 @@ private static List getOrCreateMapListValue(Map> map, K key } return value; } - - /** - * Direct mode write evaluator. - * - *

This writes the entire table in a single BigQuery request. - * The table will be created if necessary. - */ - private static void evaluateWriteHelper( - Write.Bound transform, DirectPipelineRunner.EvaluationContext context) { - BigQueryOptions options = context.getPipelineOptions(); - Bigquery client = Transport.newBigQueryClient(options).build(); - BigQueryTableInserter inserter = new BigQueryTableInserter(client); - - try { - Map> tableRows = new HashMap<>(); - for (WindowedValue windowedValue : context.getPCollectionWindowedValues( - context.getInput(transform))) { - for (BoundedWindow window : windowedValue.getWindows()) { - TableReference ref; - if (transform.tableRefFunction != null) { - ref = transform.tableRefFunction.apply(window); - } else { - ref = transform.table; - } - if (ref.getProjectId() == null) { - ref.setProjectId(options.getProject()); - } - - List rows = getOrCreateMapListValue(tableRows, ref); - rows.add(windowedValue.getValue()); - } - } - - for (TableReference ref : tableRows.keySet()) { - LOG.info("Writing to BigQuery table {}", toTableSpec(ref)); - // {@link BigQueryTableInserter#getOrCreateTable} validates {@link CreateDisposition} - // and {@link WriteDisposition}. - // For each {@link TableReference}, it can only be called before rows are written. - inserter.getOrCreateTable( - ref, transform.writeDisposition, transform.createDisposition, transform.schema); - inserter.insertAll(ref, tableRows.get(ref)); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java new file mode 100644 index 0000000000..85c39ea84b --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.cloud.dataflow.sdk.options.BigQueryOptions; + +import java.io.IOException; +import java.io.Serializable; + +/** + * An interface for real, mock, or fake implementations of Cloud BigQuery services. + */ +public interface BigQueryServices extends Serializable { + + /** + * Status of a BigQuery job or request. + */ + enum Status { + SUCCEEDED, + FAILED, + UNKNOWN, + } + + /** + * Returns a real, mock, or fake {@link LoadService}. + */ + public LoadService getLoadService(BigQueryOptions bqOptions); + + /** + * An interface for the Cloud BigQuery load service. + */ + public interface LoadService { + /** + * Start a BigQuery load job. + */ + public void startLoadJob(String jobId, JobConfigurationLoad loadConfig) + throws InterruptedException, IOException; + + /** + * Poll the status of a BigQuery load job. + */ + public Status pollJobStatus(String projectId, String jobId) + throws InterruptedException, IOException; + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java new file mode 100644 index 0000000000..342095baa7 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java @@ -0,0 +1,194 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfiguration; +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.TableReference; +import com.google.cloud.dataflow.sdk.options.BigQueryOptions; +import com.google.cloud.hadoop.util.ApiErrorExtractor; +import com.google.common.annotations.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * An implementation of {@link BigQueryServices} that actually communicates with the cloud BigQuery + * service. + */ +public class BigQueryServicesImpl implements BigQueryServices { + + // The maximum number of attempts to execute a load job RPC. + private static final int MAX_LOAD_JOB_RPC_ATTEMPTS = 10; + + // The initial backoff for executing a load job RPC. + private static final long INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1); + // The maximum number of retries to poll the status of a load job. + // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery load job finishes. + private static final int MAX_LOAD_JOB_POLL_RETRIES = Integer.MAX_VALUE; + + // The initial backoff for polling the status of a load job. + private static final long INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(60); + + @Override + public LoadService getLoadService(BigQueryOptions options) { + return new LoadServiceImpl(options); + } + + @VisibleForTesting + static class LoadServiceImpl implements BigQueryServices.LoadService { + private static final Logger LOG = LoggerFactory.getLogger(LoadServiceImpl.class); + + private final ApiErrorExtractor errorExtractor; + private final Bigquery client; + + @VisibleForTesting + LoadServiceImpl(Bigquery client) { + this.errorExtractor = new ApiErrorExtractor(); + this.client = client; + } + + private LoadServiceImpl(BigQueryOptions options) { + this.errorExtractor = new ApiErrorExtractor(); + this.client = Transport.newBigQueryClient(options).build(); + } + + /** + * {@inheritDoc} + * + *

Retries the RPC for at most {@code MAX_LOAD_JOB_RPC_ATTEMPTS} times until it succeeds. + * + * @throws IOException if it exceeds max RPC retries. + */ + @Override + public void startLoadJob( + String jobId, + JobConfigurationLoad loadConfig) throws InterruptedException, IOException { + BackOff backoff = new AttemptBoundedExponentialBackOff( + MAX_LOAD_JOB_RPC_ATTEMPTS, INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS); + startLoadJob(jobId, loadConfig, Sleeper.DEFAULT, backoff); + } + + /** + * {@inheritDoc} + * + *

Retries the poll request for at most {@code MAX_LOAD_JOB_POLL_RETRIES} times + * until the job is DONE. + */ + @Override + public Status pollJobStatus(String projectId, String jobId) throws InterruptedException { + BackOff backoff = new AttemptBoundedExponentialBackOff( + MAX_LOAD_JOB_POLL_RETRIES, INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS); + return pollJobStatus(projectId, jobId, Sleeper.DEFAULT, backoff); + } + + @VisibleForTesting + void startLoadJob( + String jobId, + JobConfigurationLoad loadConfig, + Sleeper sleeper, + BackOff backoff) + throws InterruptedException, IOException { + TableReference ref = loadConfig.getDestinationTable(); + String projectId = ref.getProjectId(); + + Job job = new Job(); + JobReference jobRef = new JobReference(); + jobRef.setProjectId(projectId); + jobRef.setJobId(jobId); + job.setJobReference(jobRef); + JobConfiguration config = new JobConfiguration(); + config.setLoad(loadConfig); + job.setConfiguration(config); + + Exception lastException = null; + do { + try { + client.jobs().insert(projectId, job).execute(); + return; // SUCCEEDED + } catch (GoogleJsonResponseException e) { + if (errorExtractor.itemAlreadyExists(e)) { + return; // SUCCEEDED + } + // ignore and retry + LOG.warn("Ignore the error and retry inserting the job.", e); + lastException = e; + } catch (IOException e) { + // ignore and retry + LOG.warn("Ignore the error and retry inserting the job.", e); + lastException = e; + } + } while (nextBackOff(sleeper, backoff)); + throw new IOException( + String.format( + "Unable to insert job: %s, aborting after %d retries.", + jobId, MAX_LOAD_JOB_RPC_ATTEMPTS), + lastException); + } + + @VisibleForTesting + Status pollJobStatus( + String projectId, + String jobId, + Sleeper sleeper, + BackOff backoff) throws InterruptedException { + do { + try { + JobStatus status = client.jobs().get(projectId, jobId).execute().getStatus(); + if (status != null && status.getState() != null && status.getState().equals("DONE")) { + if (status.getErrorResult() != null) { + return Status.FAILED; + } else if (status.getErrors() != null && !status.getErrors().isEmpty()) { + return Status.FAILED; + } else { + return Status.SUCCEEDED; + } + } + // The job is not DONE, wait longer and retry. + } catch (IOException e) { + // ignore and retry + LOG.warn("Ignore the error and retry polling job status.", e); + } + } while (nextBackOff(sleeper, backoff)); + LOG.warn("Unable to poll job status: {}, aborting after {} retries.", + jobId, MAX_LOAD_JOB_POLL_RETRIES); + return Status.UNKNOWN; + } + + /** + * Identical to {@link BackOffUtils#next} but without checked IOException. + * @throws InterruptedException + */ + private boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException { + try { + return BackOffUtils.next(sleeper, backoff); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java index a081de095c..589420ebf1 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java @@ -92,10 +92,10 @@ private void checkWriteObjectWithValidate( BigQueryIO.Write.Bound bound, String project, String dataset, String table, TableSchema schema, CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate) { - assertEquals(project, bound.table.getProjectId()); - assertEquals(dataset, bound.table.getDatasetId()); - assertEquals(table, bound.table.getTableId()); - assertEquals(schema, bound.schema); + assertEquals(project, bound.getTable().getProjectId()); + assertEquals(dataset, bound.getTable().getDatasetId()); + assertEquals(table, bound.getTable().getTableId()); + assertEquals(schema, bound.getSchema()); assertEquals(createDisposition, bound.createDisposition); assertEquals(writeDisposition, bound.writeDisposition); assertEquals(validate, bound.validate); @@ -350,15 +350,11 @@ private void testWriteValidatesDataset(boolean streaming) { thrown.expectMessage( Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence")) .or(Matchers.containsString("BigQuery dataset not found for table"))); - try { - p.apply(Create.of().withCoder(TableRowJsonCoder.of())) - .apply(BigQueryIO.Write.named("WriteMyTable") - .to(tableRef) - .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .withSchema(new TableSchema())); - } finally { - Assert.assertEquals("someproject", tableRef.getProjectId()); - } + p.apply(Create.of().withCoder(TableRowJsonCoder.of())) + .apply(BigQueryIO.Write.named("WriteMyTable") + .to(tableRef) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withSchema(new TableSchema())); } @Test diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImplTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImplTest.java new file mode 100644 index 0000000000..08e6686646 --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImplTest.java @@ -0,0 +1,260 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.client.googleapis.json.GoogleJsonError; +import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo; +import com.google.api.client.googleapis.json.GoogleJsonErrorContainer; +import com.google.api.client.http.LowLevelHttpResponse; +import com.google.api.client.json.GenericJson; +import com.google.api.client.json.Json; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.client.testing.http.MockLowLevelHttpRequest; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.Sleeper; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.ErrorProto; +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.TableReference; +import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; +import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper; +import com.google.common.collect.ImmutableList; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Tests for {@link BigQueryServicesImpl}. + */ +@RunWith(JUnit4.class) +public class BigQueryServicesImplTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryServicesImpl.class); + @Mock private LowLevelHttpResponse response; + private Bigquery bigquery; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + // A mock transport that lets us mock the API responses. + MockHttpTransport transport = + new MockHttpTransport.Builder() + .setLowLevelHttpRequest( + new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() throws IOException { + return response; + } + }) + .build(); + + // A sample BigQuery API client that uses default JsonFactory and RetryHttpInitializer. + bigquery = + new Bigquery.Builder( + transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()) + .build(); + } + + /** + * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#startLoadJob} succeeds. + */ + @Test + public void testStartLoadJobSucceeds() throws IOException, InterruptedException { + Job testJob = new Job(); + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testJob)); + + TableReference ref = new TableReference(); + ref.setProjectId("projectId"); + JobConfigurationLoad loadConfig = new JobConfigurationLoad(); + loadConfig.setDestinationTable(ref); + + Sleeper sleeper = new FastNanoClockAndSleeper(); + BackOff backoff = new AttemptBoundedExponentialBackOff( + 5 /* attempts */, 1000 /* initialIntervalMillis */); + BigQueryServicesImpl.LoadServiceImpl loadService = + new BigQueryServicesImpl.LoadServiceImpl(bigquery); + loadService.startLoadJob("jobId", loadConfig, sleeper, backoff); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** + * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#startLoadJob} succeeds + * with an already exist job. + */ + @Test + public void testStartLoadJobSucceedsAlreadyExists() throws IOException, InterruptedException { + when(response.getStatusCode()).thenReturn(409); // 409 means already exists + + TableReference ref = new TableReference(); + ref.setProjectId("projectId"); + JobConfigurationLoad loadConfig = new JobConfigurationLoad(); + loadConfig.setDestinationTable(ref); + + Sleeper sleeper = new FastNanoClockAndSleeper(); + BackOff backoff = new AttemptBoundedExponentialBackOff( + 5 /* attempts */, 1000 /* initialIntervalMillis */); + BigQueryServicesImpl.LoadServiceImpl loadService = + new BigQueryServicesImpl.LoadServiceImpl(bigquery); + loadService.startLoadJob("jobId", loadConfig, sleeper, backoff); + + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** + * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#startLoadJob} succeeds with a retry. + */ + @Test + public void testStartLoadJobRetry() throws IOException, InterruptedException { + Job testJob = new Job(); + + // First response is 403 rate limited, second response has valid payload. + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(403).thenReturn(200); + when(response.getContent()) + .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403))) + .thenReturn(toStream(testJob)); + + TableReference ref = new TableReference(); + ref.setProjectId("projectId"); + JobConfigurationLoad loadConfig = new JobConfigurationLoad(); + loadConfig.setDestinationTable(ref); + + Sleeper sleeper = new FastNanoClockAndSleeper(); + BackOff backoff = new AttemptBoundedExponentialBackOff( + 5 /* attempts */, 1000 /* initialIntervalMillis */); + BigQueryServicesImpl.LoadServiceImpl loadService = + new BigQueryServicesImpl.LoadServiceImpl(bigquery); + loadService.startLoadJob("jobId", loadConfig, sleeper, backoff); + verify(response, times(2)).getStatusCode(); + verify(response, times(2)).getContent(); + verify(response, times(2)).getContentType(); + } + + /** + * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#pollJobStatus} succeeds. + */ + @Test + public void testPollJobStatusSucceeds() throws IOException, InterruptedException { + Job testJob = new Job(); + testJob.setStatus(new JobStatus().setState("DONE")); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testJob)); + + BigQueryServicesImpl.LoadServiceImpl loadService = + new BigQueryServicesImpl.LoadServiceImpl(bigquery); + BigQueryServices.Status status = + loadService.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); + + assertEquals(BigQueryServices.Status.SUCCEEDED, status); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** + * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#pollJobStatus} fails. + */ + @Test + public void testPollJobStatusFailed() throws IOException, InterruptedException { + Job testJob = new Job(); + testJob.setStatus(new JobStatus().setState("DONE").setErrorResult(new ErrorProto())); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testJob)); + + BigQueryServicesImpl.LoadServiceImpl loadService = + new BigQueryServicesImpl.LoadServiceImpl(bigquery); + BigQueryServices.Status status = + loadService.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); + + assertEquals(BigQueryServices.Status.FAILED, status); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** + * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#pollJobStatus} returns UNKNOWN. + */ + @Test + public void testPollJobStatusUnknown() throws IOException, InterruptedException { + Job testJob = new Job(); + testJob.setStatus(new JobStatus()); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testJob)); + + BigQueryServicesImpl.LoadServiceImpl loadService = + new BigQueryServicesImpl.LoadServiceImpl(bigquery); + BigQueryServices.Status status = + loadService.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.STOP_BACKOFF); + + assertEquals(BigQueryServices.Status.UNKNOWN, status); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** A helper to wrap a {@link GenericJson} object in a content stream. */ + private static InputStream toStream(GenericJson content) throws IOException { + return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content)); + } + + /** A helper that generates the error JSON payload that Google APIs produce. */ + private static GoogleJsonErrorContainer errorWithReasonAndStatus(String reason, int status) { + ErrorInfo info = new ErrorInfo(); + info.setReason(reason); + info.setDomain("global"); + // GoogleJsonError contains one or more ErrorInfo objects; our utiities read the first one. + GoogleJsonError error = new GoogleJsonError(); + error.setErrors(ImmutableList.of(info)); + error.setCode(status); + // The actual JSON response is an error container. + GoogleJsonErrorContainer container = new GoogleJsonErrorContainer(); + container.setError(error); + return container; + } +} +