From b4c3a4ff927c052d503378aeb1584c82fe457c47 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Wed, 8 Jan 2025 16:43:20 +0000 Subject: [PATCH] [BigQueryIO] fetch updated schema for newly created Storage API stream writers (#33231) * add dynamic dest test * fix and add some tests * add to changes.md * fix whitespace * trigger postcommits * address comments --- .../beam_PostCommit_Java_DataflowV2.json | 2 +- CHANGES.md | 1 + .../sdk/io/gcp/bigquery/BigQueryServices.java | 2 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 13 +- .../StorageApiWriteUnshardedRecords.java | 20 +- .../StorageApiWritesShardedRecords.java | 22 ++ .../io/gcp/testing/FakeDatasetService.java | 4 +- .../StorageApiSinkSchemaUpdateIT.java | 206 ++++++++++++++++-- 8 files changed, 236 insertions(+), 34 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json index 3f63c0c9975f..bbdc3a3910ef 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } diff --git a/CHANGES.md b/CHANGES.md index d5cbb76fb3d5..ac3992a872de 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -79,6 +79,7 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231)) ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 374288de6562..8dbc47359b70 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -213,7 +213,7 @@ WriteStream createWriteStream(String tableUrn, WriteStream.Type type) throws IOException, InterruptedException; @Nullable - WriteStream getWriteStream(String writeStream); + TableSchema getWriteStreamSchema(String writeStream); /** * Create an append client for a given Storage API write stream. The stream must be created diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 432f31e81c90..0688694fb9be 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -76,6 +76,7 @@ import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse; import com.google.cloud.bigquery.storage.v1.FlushRowsRequest; import com.google.cloud.bigquery.storage.v1.FlushRowsResponse; +import com.google.cloud.bigquery.storage.v1.GetWriteStreamRequest; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.cloud.bigquery.storage.v1.ProtoSchema; import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; @@ -86,6 +87,7 @@ import com.google.cloud.bigquery.storage.v1.StreamWriter; import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.cloud.bigquery.storage.v1.WriteStreamView; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; import com.google.protobuf.DescriptorProtos; @@ -1418,8 +1420,15 @@ public WriteStream createWriteStream(String tableUrn, WriteStream.Type type) } @Override - public @Nullable WriteStream getWriteStream(String writeStream) { - return newWriteClient.getWriteStream(writeStream); + public @Nullable TableSchema getWriteStreamSchema(String writeStream) { + @Nullable + WriteStream stream = + newWriteClient.getWriteStream( + GetWriteStreamRequest.newBuilder() + .setView(WriteStreamView.FULL) + .setName(writeStream) + .build()); + return (stream != null && stream.hasTableSchema()) ? stream.getTableSchema() : null; } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 30a9ecd274dc..1b5f32f7e431 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -29,7 +29,6 @@ import com.google.cloud.bigquery.storage.v1.Exceptions; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.cloud.bigquery.storage.v1.TableSchema; -import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.bigquery.storage.v1.WriteStream.Type; import com.google.protobuf.ByteString; import com.google.protobuf.DescriptorProtos; @@ -475,15 +474,18 @@ SchemaAndDescriptor getCurrentTableSchema(String stream, @Nullable TableSchema u () -> { if (autoUpdateSchema) { @Nullable - WriteStream writeStream = + TableSchema streamSchema = Preconditions.checkStateNotNull(maybeWriteStreamService) - .getWriteStream(streamName); - if (writeStream != null && writeStream.hasTableSchema()) { - TableSchema updatedFromStream = writeStream.getTableSchema(); - currentSchema.set(updatedFromStream); - updated.set(true); - LOG.debug( - "Fetched updated schema for table {}:\n\t{}", tableUrn, updatedFromStream); + .getWriteStreamSchema(streamName); + if (streamSchema != null) { + Optional newSchema = + TableSchemaUpdateUtils.getUpdatedSchema(initialTableSchema, streamSchema); + if (newSchema.isPresent()) { + currentSchema.set(newSchema.get()); + updated.set(true); + LOG.debug( + "Fetched updated schema for table {}:\n\t{}", tableUrn, newSchema.get()); + } } } return null; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 738a52b69cb7..d905c4bf93ca 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -531,6 +531,28 @@ public void process( element.getKey().getKey(), dynamicDestinations, datasetService); tableSchema = converter.getTableSchema(); descriptor = converter.getDescriptor(false); + + if (autoUpdateSchema) { + // A StreamWriter ignores table schema updates that happen prior to its creation. + // So before creating a StreamWriter below, we fetch the table schema to check if we + // missed an update. + // If so, use the new schema instead of the base schema + @Nullable + TableSchema streamSchema = + MoreObjects.firstNonNull( + writeStreamService.getWriteStreamSchema(getOrCreateStream.get()), + TableSchema.getDefaultInstance()); + Optional newSchema = + TableSchemaUpdateUtils.getUpdatedSchema(tableSchema, streamSchema); + + if (newSchema.isPresent()) { + tableSchema = newSchema.get(); + descriptor = + TableRowToStorageApiProto.descriptorSchemaFromTableSchema( + tableSchema, true, false); + updatedSchema.write(tableSchema); + } + } } AppendClientInfo info = AppendClientInfo.of( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index a99e4bea37a7..24229643eca3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -590,11 +590,11 @@ public WriteStream createWriteStream(String tableUrn, Type type) throws Interrup @Override @Nullable - public WriteStream getWriteStream(String streamName) { + public com.google.cloud.bigquery.storage.v1.TableSchema getWriteStreamSchema(String streamName) { synchronized (FakeDatasetService.class) { @Nullable Stream stream = writeStreams.get(streamName); if (stream != null) { - return stream.toWriteStream(); + return stream.toWriteStream().getTableSchema(); } } // TODO(relax): Return the exact error that BigQuery returns. diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java index ae9e9cefb150..3118e97b2b93 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java @@ -19,6 +19,8 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; @@ -33,7 +35,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.function.Function; @@ -60,6 +64,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.joda.time.Duration; @@ -124,6 +129,9 @@ public static Iterable data() { private static final int TOTAL_N = 70; // Number of rows with the original schema private static final int ORIGINAL_N = 60; + // for dynamic destination test + private static final int NUM_DESTINATIONS = 3; + private static final int TOTAL_NUM_STREAMS = 9; private final Random randomGenerator = new Random(); @@ -145,6 +153,11 @@ public static void cleanUp() { } private String createTable(TableSchema tableSchema) throws IOException, InterruptedException { + return createTable(tableSchema, ""); + } + + private String createTable(TableSchema tableSchema, String suffix) + throws IOException, InterruptedException { String tableId = Iterables.get(Splitter.on('[').split(testName.getMethodName()), 0); if (useInputSchema) { tableId += "WithInputSchema"; @@ -152,6 +165,8 @@ private String createTable(TableSchema tableSchema) throws IOException, Interrup if (changeTableSchema) { tableId += "OnSchemaChange"; } + tableId += suffix; + BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, tableId); BQ_CLIENT.createNewTable( PROJECT, @@ -170,9 +185,8 @@ static class UpdateSchemaDoFn extends DoFn, TableRow> { private final String projectId; private final String datasetId; - private final String tableId; // represent as String because TableSchema is not serializable - private final String newSchema; + private final Map newSchemas; private transient BigqueryClient bqClient; @@ -183,11 +197,14 @@ static class UpdateSchemaDoFn extends DoFn, TableRow> { private final StateSpec> counter; public UpdateSchemaDoFn( - String projectId, String datasetId, String tableId, TableSchema newSchema) { + String projectId, String datasetId, Map newSchemas) { this.projectId = projectId; this.datasetId = datasetId; - this.tableId = tableId; - this.newSchema = BigQueryHelpers.toJsonString(newSchema); + Map serializableSchemas = new HashMap<>(); + for (Map.Entry entry : newSchemas.entrySet()) { + serializableSchemas.put(entry.getKey(), BigQueryHelpers.toJsonString(entry.getValue())); + } + this.newSchemas = serializableSchemas; this.bqClient = null; this.counter = StateSpecs.value(); } @@ -201,14 +218,17 @@ public void setup() { public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState counter) throws Exception { int current = firstNonNull(counter.read(), 0); - // We update schema early on to leave a healthy amount of time for - // StreamWriter to recognize it. - if (current == 10) { - bqClient.updateTableSchema( - projectId, - datasetId, - tableId, - BigQueryHelpers.fromJsonString(newSchema, TableSchema.class)); + // We update schema early on to leave a healthy amount of time for StreamWriter to recognize + // it. + // We also update halfway through so that some writers are created *after* the schema update + if (current == TOTAL_NUM_STREAMS / 2) { + for (Map.Entry entry : newSchemas.entrySet()) { + bqClient.updateTableSchema( + projectId, + datasetId, + entry.getKey(), + BigQueryHelpers.fromJsonString(entry.getValue(), TableSchema.class)); + } } counter.write(++current); @@ -304,7 +324,7 @@ private void runStreamingPipelineWithSchemaChange( p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(0); // Limit parallelism so that all streams recognize the new schema in an expected short amount // of time (before we start writing rows with updated schema) - p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(3); + p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(TOTAL_NUM_STREAMS); // Need to manually enable streaming engine for legacy dataflow runner ExperimentalOptions.addExperiment( p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT); @@ -394,7 +414,8 @@ private void runStreamingPipelineWithSchemaChange( .apply( "Update Schema", ParDo.of( - new UpdateSchemaDoFn(PROJECT, BIG_QUERY_DATASET_ID, tableId, updatedSchema))); + new UpdateSchemaDoFn( + PROJECT, BIG_QUERY_DATASET_ID, ImmutableMap.of(tableId, updatedSchema)))); } WriteResult result = rows.apply("Stream to BigQuery", write); if (useIgnoreUnknownValues) { @@ -494,13 +515,13 @@ public void checkRowsWithUpdatedSchema( if (Integer.parseInt((String) row.get("id")) < ORIGINAL_N || !useAutoSchemaUpdate || !changeTableSchema) { - assertTrue( + assertNull( String.format("Expected row to NOT have field %s:\n%s", extraField, row), - row.get(extraField) == null); + row.get(extraField)); } else { - assertTrue( + assertNotNull( String.format("Expected row to have field %s:\n%s", extraField, row), - row.get(extraField) != null); + row.get(extraField)); } } } @@ -539,4 +560,151 @@ public void testAtLeastOnceWithIgnoreUnknownValues() throws Exception { public void testAtLeastOnceWithAutoSchemaUpdate() throws Exception { runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, true, true); } + + public void runDynamicDestinationsWithAutoSchemaUpdate(boolean useAtLeastOnce) throws Exception { + Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions()); + // 0 threshold so that the stream tries fetching an updated schema after each append + p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(0); + // Total streams per destination + p.getOptions() + .as(BigQueryOptions.class) + .setNumStorageWriteApiStreams(TOTAL_NUM_STREAMS / NUM_DESTINATIONS); + // Need to manually enable streaming engine for legacy dataflow runner + ExperimentalOptions.addExperiment( + p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT); + // Only run the most relevant test case on Dataflow + if (p.getOptions().getRunner().getName().contains("DataflowRunner")) { + assumeTrue( + "Skipping in favor of more relevant test case", changeTableSchema && useInputSchema); + } + + List fieldNamesOrigin = new ArrayList(Arrays.asList(FIELDS)); + + // Shuffle the fields in the write schema to do fuzz testing on field order + List fieldNamesShuffled = new ArrayList(fieldNamesOrigin); + Collections.shuffle(fieldNamesShuffled, randomGenerator); + TableSchema bqTableSchema = makeTableSchemaFromTypes(fieldNamesOrigin, null); + TableSchema inputSchema = makeTableSchemaFromTypes(fieldNamesShuffled, null); + + Map destinations = new HashMap<>(NUM_DESTINATIONS); + Map updatedSchemas = new HashMap<>(NUM_DESTINATIONS); + Map extraFields = new HashMap<>(NUM_DESTINATIONS); + Map rowFuncs = new HashMap<>(NUM_DESTINATIONS); + for (int i = 0; i < NUM_DESTINATIONS; i++) { + // The updated schema includes all fields in the original schema plus a random new field + List fieldNamesWithExtra = new ArrayList(fieldNamesOrigin); + String extraField = + fieldNamesOrigin.get(randomGenerator.nextInt(fieldNamesOrigin.size())) + "_EXTRA"; + fieldNamesWithExtra.add(extraField); + TableSchema updatedSchema = + makeTableSchemaFromTypes(fieldNamesWithExtra, ImmutableSet.of(extraField)); + GenerateRowFunc generateRowFunc = new GenerateRowFunc(fieldNamesOrigin, fieldNamesWithExtra); + + String tableId = createTable(bqTableSchema, "_dynamic_" + i); + String tableSpec = PROJECT + ":" + BIG_QUERY_DATASET_ID + "." + tableId; + + rowFuncs.put((long) i, generateRowFunc); + destinations.put((long) i, tableSpec); + updatedSchemas.put(tableId, updatedSchema); + extraFields.put(tableSpec, extraField); + } + + // build write transform + Write write = + BigQueryIO.writeTableRows() + .to( + row -> { + long l = (int) row.getValue().get("id") % NUM_DESTINATIONS; + String destination = destinations.get(l); + return new TableDestination(destination, null); + }) + .withAutoSchemaUpdate(true) + .ignoreUnknownValues() + .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE) + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withWriteDisposition(WriteDisposition.WRITE_APPEND); + if (useInputSchema) { + write = write.withSchema(inputSchema); + } + if (!useAtLeastOnce) { + write = + write + .withMethod(Write.Method.STORAGE_WRITE_API) + .withTriggeringFrequency(Duration.standardSeconds(1)); + } + + int numRows = TOTAL_N; + // set up and build pipeline + Instant start = new Instant(0); + // We give a healthy waiting period between each element to give Storage API streams a chance to + // recognize the new schema. Apply on relevant tests. + Duration interval = changeTableSchema ? Duration.standardSeconds(1) : Duration.millis(1); + Duration stop = + changeTableSchema ? Duration.standardSeconds(numRows - 1) : Duration.millis(numRows - 1); + Function getIdFromInstant = + changeTableSchema + ? (Function & Serializable) + (Instant instant) -> instant.getMillis() / 1000 + : (Function & Serializable) Instant::getMillis; + + // Generates rows with original schema up for row IDs under ORIGINAL_N + // Then generates rows with updated schema for the rest + // Rows with updated schema should only reach the table if ignoreUnknownValues is set, + // and the extra field should be present only when autoSchemaUpdate is set + PCollection instants = + p.apply( + "Generate Instants", + PeriodicImpulse.create() + .startAt(start) + .stopAt(start.plus(stop)) + .withInterval(interval) + .catchUpToNow(false)); + PCollection rows = + instants.apply( + "Create TableRows", + MapElements.into(TypeDescriptor.of(TableRow.class)) + .via( + instant -> { + long rowId = getIdFromInstant.apply(instant); + long dest = rowId % NUM_DESTINATIONS; + return rowFuncs.get(dest).apply(rowId); + })); + if (changeTableSchema) { + rows = + rows + // UpdateSchemaDoFn uses state, so need to have a KV input + .apply("Add a dummy key", WithKeys.of(1)) + .apply( + "Update Schema", + ParDo.of(new UpdateSchemaDoFn(PROJECT, BIG_QUERY_DATASET_ID, updatedSchemas))); + } + + WriteResult result = rows.apply("Stream to BigQuery", write); + // We ignore the extra fields, so no rows should have been sent to DLQ + PAssert.that("Check DLQ is empty", result.getFailedStorageApiInserts()).empty(); + p.run().waitUntilFinish(); + + Map expectedCounts = new HashMap<>(NUM_DESTINATIONS); + for (int i = 0; i < numRows; i++) { + long mod = i % NUM_DESTINATIONS; + String destination = destinations.get(mod); + expectedCounts.merge(destination, 1, Integer::sum); + } + + for (Map.Entry expectedCount : expectedCounts.entrySet()) { + String dest = expectedCount.getKey(); + checkRowCompleteness(dest, expectedCount.getValue(), true); + checkRowsWithUpdatedSchema(dest, extraFields.get(dest), true); + } + } + + @Test + public void testExactlyOnceDynamicDestinationsWithAutoSchemaUpdate() throws Exception { + runDynamicDestinationsWithAutoSchemaUpdate(false); + } + + @Test + public void testAtLeastOnceDynamicDestinationsWithAutoSchemaUpdate() throws Exception { + runDynamicDestinationsWithAutoSchemaUpdate(true); + } }