From 08e9a28768adc1eb80fb2a5bbab29ada562b3b11 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Fri, 15 Nov 2024 17:28:35 -0500 Subject: [PATCH 1/2] Add DLQ support to JdbcToBigQuery using BQ Storage Write API Signed-off-by: Jeffrey Kinard --- .../v2/transforms/ErrorConverters.java | 23 ++++- .../v2/options/JdbcToBigQueryOptions.java | 15 +++ .../teleport/v2/templates/JdbcToBigQuery.java | 92 ++++++++++++++++++- .../v2/templates/JdbcToBigQueryIT.java | 73 +++++++++++++-- 4 files changed, 193 insertions(+), 10 deletions(-) diff --git a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ErrorConverters.java b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ErrorConverters.java index 49466183b7..51a294ca8d 100644 --- a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ErrorConverters.java +++ b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ErrorConverters.java @@ -59,6 +59,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Ascii; import org.joda.time.DateTimeZone; import org.joda.time.Duration; +import org.joda.time.Instant; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -83,11 +84,15 @@ public static Builder newBuilder() { public abstract String getErrorRecordsTableSchema(); + public abstract boolean getUseWindowedTimestamp(); + @Override public WriteResult expand(PCollection> failedRecords) { return failedRecords - .apply("FailedRecordToTableRow", ParDo.of(new FailedStringToTableRowFn())) + .apply( + "FailedRecordToTableRow", + ParDo.of(new FailedStringToTableRowFn(getUseWindowedTimestamp()))) .apply( "WriteFailedRecordsToBigQuery", BigQueryIO.writeTableRows() @@ -104,6 +109,8 @@ public abstract static class Builder { public abstract Builder setErrorRecordsTableSchema(String errorRecordsTableSchema); + public abstract Builder setUseWindowedTimestamp(boolean useWindowedTimestamp); + public abstract WriteStringMessageErrors build(); } } @@ -115,6 +122,16 @@ public abstract static class Builder { public static class FailedStringToTableRowFn extends DoFn, TableRow> { + private boolean useWindowedTimestamp; + + public FailedStringToTableRowFn() { + this(true); + } + + public FailedStringToTableRowFn(boolean useWindowedTimestamp) { + this.useWindowedTimestamp = useWindowedTimestamp; + } + /** * The formatter used to convert timestamps into a BigQuery compatible format. @@ -129,7 +146,9 @@ public void processElement(ProcessContext context) { // Format the timestamp for insertion String timestamp = - TIMESTAMP_FORMATTER.print(context.timestamp().toDateTime(DateTimeZone.UTC)); + TIMESTAMP_FORMATTER.print( + (useWindowedTimestamp ? context.timestamp() : Instant.now()) + .toDateTime(DateTimeZone.UTC)); // Build the table row TableRow failedRow = diff --git a/v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/JdbcToBigQueryOptions.java b/v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/JdbcToBigQueryOptions.java index 5fd75bb9f1..8f2769b483 100644 --- a/v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/JdbcToBigQueryOptions.java +++ b/v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/JdbcToBigQueryOptions.java @@ -256,4 +256,19 @@ public interface JdbcToBigQueryOptions String getBigQuerySchemaPath(); void setBigQuerySchemaPath(String path); + + @TemplateParameter.BigQueryTable( + order = 21, + optional = true, + description = + "Table for messages that failed to reach the output table (i.e., Deadletter table) when using Storage Write API", + helpText = + "The BigQuery table to use for messages that failed to reach the output table, " + + "formatted as `\"PROJECT_ID:DATASET_NAME.TABLE_NAME\"`. If the table " + + "doesn't exist, it is created when the pipeline runs. " + + "If this parameter is not specified, the pipeline will fail on write errors." + + "This parameter can only be specified if `useStorageWriteApi` or `useStorageWriteApiAtLeastOnce` is set to true.") + String getOutputDeadletterTable(); + + void setOutputDeadletterTable(String value); } diff --git a/v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java b/v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java index ce34c3e0ac..04de8da1de 100644 --- a/v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java +++ b/v2/jdbc-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java @@ -15,28 +15,45 @@ */ package com.google.cloud.teleport.v2.templates; +import static com.google.cloud.teleport.v2.transforms.BigQueryConverters.wrapBigQueryInsertError; import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString; import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt; +import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.teleport.metadata.Template; import com.google.cloud.teleport.metadata.TemplateCategory; +import com.google.cloud.teleport.v2.coders.FailsafeElementCoder; import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger; import com.google.cloud.teleport.v2.options.JdbcToBigQueryOptions; +import com.google.cloud.teleport.v2.transforms.ErrorConverters; import com.google.cloud.teleport.v2.utils.BigQueryIOUtils; import com.google.cloud.teleport.v2.utils.GCSAwareValueProvider; import com.google.cloud.teleport.v2.utils.JdbcConverters; +import com.google.cloud.teleport.v2.utils.ResourceUtils; import com.google.cloud.teleport.v2.utils.SecretManagerUtils; +import com.google.cloud.teleport.v2.values.FailsafeElement; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError; +import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; +import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.io.jdbc.JdbcIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; /** @@ -69,6 +86,10 @@ }) public class JdbcToBigQuery { + /** Coder for FailsafeElement. */ + private static final FailsafeElementCoder FAILSAFE_ELEMENT_CODER = + FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); + /** * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If * blocking execution is required, use the {@link JdbcToBigQuery#run} method to start the pipeline @@ -97,6 +118,12 @@ public static void main(String[] args) { static PipelineResult run(JdbcToBigQueryOptions options, Write writeToBQ) { // Validate BQ STORAGE_WRITE_API options BigQueryIOUtils.validateBQStorageApiOptionsBatch(options); + if (!options.getUseStorageWriteApi() + && !options.getUseStorageWriteApiAtLeastOnce() + && !Strings.isNullOrEmpty(options.getOutputDeadletterTable())) { + throw new IllegalArgumentException( + "outputDeadletterTable can only be specified if BigQuery Storage Write API is enabled either with useStorageWriteApi or useStorageWriteApiAtLeastOnce."); + } // Create the pipeline Pipeline pipeline = Pipeline.create(options); @@ -175,12 +202,71 @@ static PipelineResult run(JdbcToBigQueryOptions options, Write writeTo /* * Step 2: Append TableRow to an existing BigQuery table */ - rows.apply("Write to BigQuery", writeToBQ); + WriteResult writeResult = rows.apply("Write to BigQuery", writeToBQ); + + /* + * Step 3. + * If using Storage Write API, capture failed inserts and either + * a) write error rows to DLQ + * b) fail the pipeline + */ + if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) { + PCollection insertErrors = + BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options); + + if (!Strings.isNullOrEmpty(options.getOutputDeadletterTable())) { + /* + * Step 3a. + * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement + */ + PCollection> failedInserts = + insertErrors + .apply( + "WrapInsertionErrors", + MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor()) + .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e))) + .setCoder(FAILSAFE_ELEMENT_CODER); + + /* + * Step 3a Contd. + * Insert records that failed insert into deadletter table + */ + failedInserts.apply( + "WriteFailedRecords", + ErrorConverters.WriteStringMessageErrors.newBuilder() + .setErrorRecordsTable(options.getOutputDeadletterTable()) + .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson()) + .setUseWindowedTimestamp(false) + .build()); + } else { + /* + * Step 3b. + * Fail pipeline upon write errors if no DLQ was specified + */ + insertErrors.apply(ParDo.of(new ThrowWriteErrorsDoFn())); + } + } // Execute the pipeline and return the result. return pipeline.run(); } + static class ThrowWriteErrorsDoFn extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) { + BigQueryInsertError insertError = Objects.requireNonNull(c.element()); + List errorMessages = + insertError.getError().getErrors().stream() + .map(ErrorProto::getMessage) + .collect(Collectors.toList()); + String stackTrace = String.join("\nCaused by:", errorMessages); + + throw new IllegalStateException( + String.format( + "Failed to insert row %s.\nCaused by: %s", insertError.getRow(), stackTrace)); + } + } + /** * Create the {@link Write} transform that outputs the collection to BigQuery as per input option. */ @@ -198,12 +284,16 @@ static Write writeToBQTransform(JdbcToBigQueryOptions options) { : Write.WriteDisposition.WRITE_APPEND) .withCustomGcsTempLocation( StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory())) + .withExtendedErrorInfo() .to(options.getOutputTable()); if (Write.CreateDisposition.valueOf(options.getCreateDisposition()) != Write.CreateDisposition.CREATE_NEVER) { write = write.withJsonSchema(getGcsFileAsString(options.getBigQuerySchemaPath())); } + if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) { + write = write.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()); + } return write; } diff --git a/v2/jdbc-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/JdbcToBigQueryIT.java b/v2/jdbc-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/JdbcToBigQueryIT.java index 3f8d0f5fc0..118058369a 100644 --- a/v2/jdbc-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/JdbcToBigQueryIT.java +++ b/v2/jdbc-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/JdbcToBigQueryIT.java @@ -66,6 +66,8 @@ public class JdbcToBigQueryIT extends JDBCBaseIT { private static final Logger LOG = LoggerFactory.getLogger(JdbcToBigQueryIT.class); + private static final Integer NUM_ROWS = 100; + private static final String ROW_ID = "row_id"; private static final String NAME = "name"; private static final String FULL_NAME = "full_name"; @@ -73,6 +75,7 @@ public class JdbcToBigQueryIT extends JDBCBaseIT { private static final String MEMBER = "member"; private static final String IS_MEMBER = "is_member"; private static final String ENTRY_ADDED = "entry_added"; + private static final String FAKE = "FAKE"; private static final String KMS_REGION = "global"; private static final String KEYRING_ID = "JDBCToBigQuery"; @@ -125,6 +128,7 @@ public void testMySqlToBigQueryFlex() throws IOException { mySqlDriverGCSPath(), mySQLResourceManager, true, + false, config -> config.addParameter( "query", @@ -132,6 +136,36 @@ public void testMySqlToBigQueryFlex() throws IOException { + testName)); } + @Test + public void testMySqlToBigQueryFlexWithDlq() throws IOException { + // Create MySQL Resource manager + mySQLResourceManager = MySQLResourceManager.builder(testName).build(); + + // Arrange MySQL-compatible schema + HashMap columns = new HashMap<>(); + columns.put(ROW_ID, "NUMERIC NOT NULL"); + columns.put(NAME, "VARCHAR(200)"); + columns.put(AGE, "NUMERIC"); + columns.put(MEMBER, "VARCHAR(200)"); + columns.put(ENTRY_ADDED, "VARCHAR(200)"); + columns.put(FAKE, "VARCHAR(200)"); + JDBCResourceManager.JDBCSchema schema = new JDBCResourceManager.JDBCSchema(columns, ROW_ID); + + // Run a simple IT + simpleJdbcToBigQueryTest( + testName, + schema, + MYSQL_DRIVER, + mySqlDriverGCSPath(), + mySQLResourceManager, + true, + true, + config -> + config + .addParameter("query", "select * from " + testName) + .addParameter("useStorageWriteApi", "true")); + } + @Test public void testMySqlToBigQueryWithStorageWriteApi() throws IOException { // Create MySQL Resource manager @@ -156,6 +190,7 @@ public void testMySqlToBigQueryWithStorageWriteApi() throws IOException { mySqlDriverGCSPath(), mySQLResourceManager, true, + false, config -> config .addParameter( @@ -186,6 +221,7 @@ public void testPostgresToBigQueryFlex() throws IOException { postgresDriverGCSPath(), postgresResourceManager, true, + false, config -> config.addParameter( "query", @@ -220,6 +256,7 @@ public void testPostgresWithUnicodeCharactersInQuery() throws IOException { postgresDriverGCSPath(), postgresResourceManager, true, + false, config -> config.addParameter("query", getGcsPath("input/query.sql"))); } @@ -251,6 +288,7 @@ public void testOracleToBigQueryFlex() throws IOException { oracleDriverGCSPath(), oracleResourceManager, true, + false, config -> config.addParameter( "query", @@ -280,6 +318,7 @@ public void testMsSqlToBigQueryFlex() throws IOException { msSqlDriverGCSPath(), msSQLResourceManager, true, + false, config -> config.addParameter( "query", @@ -307,6 +346,7 @@ public void testReadWithPartitions() throws IOException { postgresDriverGCSPath(), postgresResourceManager, false, + false, config -> config.addParameter("table", testName).addParameter("partitionColumn", ROW_ID)); } @@ -317,6 +357,7 @@ private void simpleJdbcToBigQueryTest( String driverJars, JDBCResourceManager jdbcResourceManager, boolean useColumnAlias, + boolean useDlq, Function paramsAdder) throws IOException { simpleJdbcToBigQueryTest( @@ -328,6 +369,7 @@ private void simpleJdbcToBigQueryTest( driverJars, jdbcResourceManager, useColumnAlias, + useDlq, paramsAdder); } @@ -340,12 +382,16 @@ private void simpleJdbcToBigQueryTest( String driverJars, JDBCResourceManager jdbcResourceManager, boolean useColumnAlias, + boolean useDlq, Function paramsAdder) throws IOException { // Arrange - List> jdbcData = - getJdbcData(List.of(ROW_ID, NAME, AGE, MEMBER, ENTRY_ADDED)); + List columns = new ArrayList<>(List.of(ROW_ID, NAME, AGE, MEMBER, ENTRY_ADDED)); + if (useDlq) { + columns.add(FAKE); + } + List> jdbcData = getJdbcData(columns, useDlq); jdbcResourceManager.createTable(sourceTableName, schema); jdbcResourceManager.write(sourceTableName, jdbcData); @@ -379,7 +425,10 @@ private void simpleJdbcToBigQueryTest( .addParameter("useColumnAlias", "true") .addParameter("fetchSize", "100000") .addParameter("connectionProperties", "characterEncoding=UTF-8") - .addParameter("disabledAlgorithms", "SSLv3, GCM")); + .addParameter("disabledAlgorithms", "SSLv3, GCM") + .addParameter( + "outputDeadletterTable", + useDlq ? toTableSpecLegacy(table) + "_error_records" : "")); // Act PipelineLauncher.LaunchInfo info = launchTemplate(options); @@ -397,8 +446,15 @@ private void simpleJdbcToBigQueryTest( row.put("is_member", row.remove("member")); }); } - assertThatBigQueryRecords(bigQueryResourceManager.readTable(targetTableName)) - .hasRecordsUnorderedCaseInsensitiveColumns(jdbcData); + if (useDlq) { + assertThatBigQueryRecords(bigQueryResourceManager.readTable(targetTableName)).hasRows(0); + assertThatBigQueryRecords( + bigQueryResourceManager.readTable(targetTableName + "_error_records")) + .hasRows(NUM_ROWS); + } else { + assertThatBigQueryRecords(bigQueryResourceManager.readTable(targetTableName)) + .hasRecordsUnorderedCaseInsensitiveColumns(jdbcData); + } } /** @@ -407,15 +463,18 @@ private void simpleJdbcToBigQueryTest( * @param columns List of column names. * @return A map containing the rows of data to be stored in each JDBC table. */ - private List> getJdbcData(List columns) { + private List> getJdbcData(List columns, boolean useDlq) { List> data = new ArrayList<>(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < NUM_ROWS; i++) { Map values = new HashMap<>(); values.put(columns.get(0), i); values.put(columns.get(1), RandomStringUtils.randomAlphabetic(10)); values.put(columns.get(2), new Random().nextInt(100)); values.put(columns.get(3), i % 2 == 0 ? "Y" : "N"); values.put(columns.get(4), Instant.now().toString()); + if (useDlq) { + values.put(columns.get(5), RandomStringUtils.randomAlphabetic(10)); + } data.add(values); } From 45ae79e408e63723c9971c1af912a6ad8dca39c3 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Wed, 20 Nov 2024 14:31:20 -0500 Subject: [PATCH 2/2] fix test failures Signed-off-by: Jeffrey Kinard --- .../cloud/teleport/v2/transforms/ErrorConverters.java | 3 ++- .../cloud/teleport/v2/templates/JdbcToBigQueryIT.java | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ErrorConverters.java b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ErrorConverters.java index 51a294ca8d..9f6b72a525 100644 --- a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ErrorConverters.java +++ b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ErrorConverters.java @@ -77,7 +77,8 @@ public abstract static class WriteStringMessageErrors extends PTransform>, WriteResult> { public static Builder newBuilder() { - return new AutoValue_ErrorConverters_WriteStringMessageErrors.Builder(); + return new AutoValue_ErrorConverters_WriteStringMessageErrors.Builder() + .setUseWindowedTimestamp(true); } public abstract String getErrorRecordsTable(); diff --git a/v2/jdbc-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/JdbcToBigQueryIT.java b/v2/jdbc-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/JdbcToBigQueryIT.java index 118058369a..f95475c0e9 100644 --- a/v2/jdbc-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/JdbcToBigQueryIT.java +++ b/v2/jdbc-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/JdbcToBigQueryIT.java @@ -425,10 +425,10 @@ private void simpleJdbcToBigQueryTest( .addParameter("useColumnAlias", "true") .addParameter("fetchSize", "100000") .addParameter("connectionProperties", "characterEncoding=UTF-8") - .addParameter("disabledAlgorithms", "SSLv3, GCM") - .addParameter( - "outputDeadletterTable", - useDlq ? toTableSpecLegacy(table) + "_error_records" : "")); + .addParameter("disabledAlgorithms", "SSLv3, GCM")); + if (useDlq) { + options.addParameter("outputDeadletterTable", toTableSpecLegacy(table) + "_error_records"); + } // Act PipelineLauncher.LaunchInfo info = launchTemplate(options);