Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DLQ support to JdbcToBigQuery using BQ Storage Write API #2016

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Ascii;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

Expand All @@ -76,18 +77,23 @@
extends PTransform<PCollection<FailsafeElement<String, String>>, WriteResult> {

public static Builder newBuilder() {
return new AutoValue_ErrorConverters_WriteStringMessageErrors.Builder();
return new AutoValue_ErrorConverters_WriteStringMessageErrors.Builder()
.setUseWindowedTimestamp(true);

Check warning on line 81 in v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ErrorConverters.java

View check run for this annotation

Codecov / codecov/patch

v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ErrorConverters.java#L80-L81

Added lines #L80 - L81 were not covered by tests
}

public abstract String getErrorRecordsTable();

public abstract String getErrorRecordsTableSchema();

public abstract boolean getUseWindowedTimestamp();

@Override
public WriteResult expand(PCollection<FailsafeElement<String, String>> failedRecords) {

return failedRecords
.apply("FailedRecordToTableRow", ParDo.of(new FailedStringToTableRowFn()))
.apply(

Check warning on line 94 in v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ErrorConverters.java

View check run for this annotation

Codecov / codecov/patch

v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ErrorConverters.java#L94

Added line #L94 was not covered by tests
"FailedRecordToTableRow",
ParDo.of(new FailedStringToTableRowFn(getUseWindowedTimestamp())))

Check warning on line 96 in v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ErrorConverters.java

View check run for this annotation

Codecov / codecov/patch

v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ErrorConverters.java#L96

Added line #L96 was not covered by tests
.apply(
"WriteFailedRecordsToBigQuery",
BigQueryIO.writeTableRows()
Expand All @@ -104,6 +110,8 @@

public abstract Builder setErrorRecordsTableSchema(String errorRecordsTableSchema);

public abstract Builder setUseWindowedTimestamp(boolean useWindowedTimestamp);

public abstract WriteStringMessageErrors build();
}
}
Expand All @@ -115,6 +123,16 @@
public static class FailedStringToTableRowFn
extends DoFn<FailsafeElement<String, String>, TableRow> {

private boolean useWindowedTimestamp;

public FailedStringToTableRowFn() {
this(true);
}

public FailedStringToTableRowFn(boolean useWindowedTimestamp) {
this.useWindowedTimestamp = useWindowedTimestamp;
}

/**
* The formatter used to convert timestamps into a BigQuery compatible <a
* href="https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp-type">format</a>.
Expand All @@ -129,7 +147,9 @@

// Format the timestamp for insertion
String timestamp =
TIMESTAMP_FORMATTER.print(context.timestamp().toDateTime(DateTimeZone.UTC));
TIMESTAMP_FORMATTER.print(
(useWindowedTimestamp ? context.timestamp() : Instant.now())
.toDateTime(DateTimeZone.UTC));

// Build the table row
TableRow failedRow =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,19 @@ public interface JdbcToBigQueryOptions
String getBigQuerySchemaPath();

void setBigQuerySchemaPath(String path);

@TemplateParameter.BigQueryTable(
order = 21,
optional = true,
description =
"Table for messages that failed to reach the output table (i.e., Deadletter table) when using Storage Write API",
helpText =
"The BigQuery table to use for messages that failed to reach the output table, "
+ "formatted as `\"PROJECT_ID:DATASET_NAME.TABLE_NAME\"`. If the table "
+ "doesn't exist, it is created when the pipeline runs. "
+ "If this parameter is not specified, the pipeline will fail on write errors."
+ "This parameter can only be specified if `useStorageWriteApi` or `useStorageWriteApiAtLeastOnce` is set to true.")
String getOutputDeadletterTable();

void setOutputDeadletterTable(String value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,45 @@
*/
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.transforms.BigQueryConverters.wrapBigQueryInsertError;
import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;
import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.JdbcToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.GCSAwareValueProvider;
import com.google.cloud.teleport.v2.utils.JdbcConverters;
import com.google.cloud.teleport.v2.utils.ResourceUtils;
import com.google.cloud.teleport.v2.utils.SecretManagerUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;

/**
Expand Down Expand Up @@ -69,6 +86,10 @@
})
public class JdbcToBigQuery {

/** Coder for FailsafeElement. */
private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

/**
* Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
* blocking execution is required, use the {@link JdbcToBigQuery#run} method to start the pipeline
Expand Down Expand Up @@ -97,6 +118,12 @@
static PipelineResult run(JdbcToBigQueryOptions options, Write<TableRow> writeToBQ) {
// Validate BQ STORAGE_WRITE_API options
BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);
if (!options.getUseStorageWriteApi()
&& !options.getUseStorageWriteApiAtLeastOnce()
&& !Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
throw new IllegalArgumentException(

Check warning on line 124 in v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java#L124

Added line #L124 was not covered by tests
"outputDeadletterTable can only be specified if BigQuery Storage Write API is enabled either with useStorageWriteApi or useStorageWriteApiAtLeastOnce.");
}

// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
Expand Down Expand Up @@ -175,12 +202,71 @@
/*
* Step 2: Append TableRow to an existing BigQuery table
*/
rows.apply("Write to BigQuery", writeToBQ);
WriteResult writeResult = rows.apply("Write to BigQuery", writeToBQ);

/*
* Step 3.
* If using Storage Write API, capture failed inserts and either
* a) write error rows to DLQ
* b) fail the pipeline
*/
if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) {
PCollection<BigQueryInsertError> insertErrors =
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options);

if (!Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
/*
* Step 3a.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =

Check warning on line 222 in v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java#L222

Added line #L222 was not covered by tests
insertErrors
.apply(

Check warning on line 224 in v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java#L224

Added line #L224 was not covered by tests
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);

Check warning on line 228 in v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java#L226-L228

Added lines #L226 - L228 were not covered by tests

/*
* Step 3a Contd.
* Insert records that failed insert into deadletter table
*/
failedInserts.apply(

Check warning on line 234 in v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java#L234

Added line #L234 was not covered by tests
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(options.getOutputDeadletterTable())
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.setUseWindowedTimestamp(false)
.build());
} else {

Check warning on line 241 in v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java#L236-L241

Added lines #L236 - L241 were not covered by tests
/*
* Step 3b.
* Fail pipeline upon write errors if no DLQ was specified
*/
insertErrors.apply(ParDo.of(new ThrowWriteErrorsDoFn()));
Abacn marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Execute the pipeline and return the result.
return pipeline.run();
}

static class ThrowWriteErrorsDoFn extends DoFn<BigQueryInsertError, Void> {
@ProcessElement
public void processElement(ProcessContext c) {
BigQueryInsertError insertError = Objects.requireNonNull(c.element());
List<String> errorMessages =
insertError.getError().getErrors().stream()
.map(ErrorProto::getMessage)
.collect(Collectors.toList());
String stackTrace = String.join("\nCaused by:", errorMessages);

Check warning on line 262 in v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java#L257-L262

Added lines #L257 - L262 were not covered by tests

throw new IllegalStateException(
String.format(
"Failed to insert row %s.\nCaused by: %s", insertError.getRow(), stackTrace));

Check warning on line 266 in v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java

View check run for this annotation

Codecov / codecov/patch

v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java#L264-L266

Added lines #L264 - L266 were not covered by tests
}
}

/**
* Create the {@link Write} transform that outputs the collection to BigQuery as per input option.
*/
Expand All @@ -198,12 +284,16 @@
: Write.WriteDisposition.WRITE_APPEND)
.withCustomGcsTempLocation(
StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory()))
.withExtendedErrorInfo()
.to(options.getOutputTable());

if (Write.CreateDisposition.valueOf(options.getCreateDisposition())
!= Write.CreateDisposition.CREATE_NEVER) {
write = write.withJsonSchema(getGcsFileAsString(options.getBigQuerySchemaPath()));
}
if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) {
write = write.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors());
}

return write;
}
Expand Down
Loading
Loading