From 172eb1b9a65193576c8539bce95c6bd42b8c5016 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 21 Jun 2023 15:57:40 -0400 Subject: [PATCH 1/5] Add argument checks and tests for BQ StorageAPI sinks. --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 39 +- .../sdk/io/gcp/testing/BigqueryClient.java | 12 + .../StorageApiSinkSchemaChangeIT.java | 450 ++++++++++++++++++ 3 files changed, 497 insertions(+), 4 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 8ef39ca9cf94..351bc1bc5823 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -3141,13 +3141,34 @@ public WriteResult expand(PCollection input) { "When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, " + "triggering frequency must be specified"); } else { - checkArgument( - getTriggeringFrequency() == null && getNumFileShards() == 0, + BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); + Duration triggeringFrequency = + (method == Write.Method.STORAGE_WRITE_API) + ? getStorageApiTriggeringFrequency(bqOptions) + : getTriggeringFrequency(); + + checkArgument(triggeringFrequency == null, "Triggering frequency or number of file shards can be specified only when writing an" + " unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, but: the collection" + " was %s and the method was %s", input.isBounded(), method); + + checkArgument( + (getNumFileShards() == 0), + "Number of file shards can be specified only when writing an" + + " unbounded PCollection via FILE_LOADS, but: the collection" + + " was %s and the method was %s", + input.isBounded(), + method); + + checkArgument( + (getNumStorageWriteApiStreams() == 0), + "Number of storage write api streams can be specified only when writing an" + + " unbounded PCollection via STORAGE_API_WRITES, but: the collection" + + " was %s and the method was %s", + input.isBounded(), + method); } if (method != Method.STORAGE_WRITE_API && method != Method.STORAGE_API_AT_LEAST_ONCE) { @@ -3172,8 +3193,18 @@ public WriteResult expand(PCollection input) { !getUseBeamSchema(), "Auto schema update not supported when using Beam schemas."); } - if (input.isBounded() == IsBounded.BOUNDED) { - checkArgument(!getAutoSharding(), "Auto-sharding is only applicable to unbounded input."); + if (method == Method.STORAGE_WRITE_API && input.isBounded() == IsBounded.UNBOUNDED) { + if (getNumStorageWriteApiStreams() > 0) { + checkArgument(!getAutoSharding(), "withAutoSharding only supported when" + + " numStorageWriteApiStream is zero or not set. Currently it is set to %s.", + getNumStorageWriteApiStreams()); + } + } else { + checkArgument(!getAutoSharding(), "withAutoSharding only supported" + + " when writing an unbounded PCollection via STORAGE_API_WRITES. but: the collection" + + " was %s and the method was %s", + input.isBounded(), + method); } if (getJsonTimePartitioning() != null) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java index 584d1c7123da..ab8b59265f0b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java @@ -570,4 +570,16 @@ public Table getTableResource(String projectId, String datasetId, String tableId MAX_QUERY_RETRIES, tableId), lastException); } + + public void updateTableSchema(String projectId, String datasetId, String tableId, + TableSchema newSchema) { + try { + this.bqClient.tables() + .patch(projectId, datasetId, tableId, + new Table().setSchema(newSchema)).execute(); + LOG.info("Successfully updated the schema of table: " + tableId); + } catch (Exception e) { + LOG.debug("Exceptions caught when updating table schema: " + e.getMessage()); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java new file mode 100644 index 000000000000..5f76f524bc34 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StorageApiSinkSchemaChangeIT { + + private static final Logger LOG = LoggerFactory.getLogger(StorageApiSinkSchemaChangeIT.class); + + private static final BigqueryClient BQ_CLIENT = new BigqueryClient( + "StorageApiSinkSchemaChangeIT"); + private static final String PROJECT = TestPipeline.testingPipelineOptions().as(GcpOptions.class) + .getProject(); + private static final String BIG_QUERY_DATASET_ID = + "storage_api_sink_schema_change" + System.nanoTime(); + + private static final String[] FIELDS = {"BOOL", "BOOLEAN", "BYTES", "INT64", "INTEGER", "FLOAT", + "FLOAT64", "NUMERIC", "STRING", "DATE", "TIMESTAMP"}; + + private static final int MAX_N = 50; + + private static final long RANDOM_SEED = 1; + + @BeforeClass + public static void setUpTestEnvironment() throws IOException, InterruptedException { + // Create one BQ dataset for all test cases. + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID); + } + + @AfterClass + public static void cleanup() { + LOG.info("Start to clean up tables and datasets."); + BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID); + } + + private static String createTable(TableSchema tableSchema) + throws IOException, InterruptedException { + String tableId = "table" + System.nanoTime(); + BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, tableId); + BQ_CLIENT.createNewTable(PROJECT, BIG_QUERY_DATASET_ID, new Table().setSchema(tableSchema) + .setTableReference( + new TableReference().setTableId(tableId).setDatasetId(BIG_QUERY_DATASET_ID) + .setProjectId(PROJECT))); + return tableId; + } + + private static boolean checkRowCompleteness(String tableSpec) + throws IOException, InterruptedException { + TableRow queryResponse = Iterables.getOnlyElement(BQ_CLIENT.queryUnflattened( + String.format("SELECT COUNT(DISTINCT(id)), MIN(id), MAX(id) FROM %s", tableSpec), PROJECT, + true, true)); + + int distinctCount = Integer.parseInt((String) queryResponse.get("f0_")); + int rangeMin = Integer.parseInt((String) queryResponse.get("f1_")); + int rangeMax = Integer.parseInt((String) queryResponse.get("f2_")); + + LOG.info("total distinct count = {}, min = {}, max = {}", distinctCount, rangeMin, rangeMax); + + return (rangeMax - rangeMin + 1) == distinctCount && distinctCount == MAX_N; + } + + private static boolean checkRowDuplication(String tableSpec) + throws IOException, InterruptedException { + TableRow queryResponse = Iterables.getOnlyElement(BQ_CLIENT.queryUnflattened( + String.format("SELECT COUNT(DISTINCT(id)), COUNT(id) FROM %s", tableSpec), PROJECT, true, + true)); + + int distinctCount = Integer.parseInt((String) queryResponse.get("f0_")); + int totalCount = Integer.parseInt((String) queryResponse.get("f1_")); + + LOG.info("total distinct count = {}, total count = {}", distinctCount, totalCount); + + return distinctCount == totalCount; + } + + static class UpdateSchemaDoFn extends DoFn, Long> { + + private final String projectId; + private final String datasetId; + private final String tableId; + + private final String schemaString; + + private transient BigqueryClient BQ_CLIENT; + + private static final String MY_COUNTER = "myCounter"; + + @StateId(MY_COUNTER) + @SuppressWarnings("unused") + private final StateSpec<@org.jetbrains.annotations.NotNull ValueState> counter; + + public UpdateSchemaDoFn(String projectId, String datasetId, String tableId, + TableSchema schema) { + this.projectId = projectId; + this.datasetId = datasetId; + this.tableId = tableId; + this.schemaString = BigQueryHelpers.toJsonString(schema); + this.BQ_CLIENT = null; + this.counter = StateSpecs.value(); + } + + private int getRowCount(String tableSpec) throws IOException, InterruptedException { + TableRow queryResponse = Iterables.getOnlyElement( + BQ_CLIENT.queryUnflattened(String.format("SELECT COUNT(*) FROM %s", tableSpec), PROJECT, + true, true)); + return Integer.parseInt((String) queryResponse.get("f0_")); + } + + @Setup + public void setup() { + BQ_CLIENT = new BigqueryClient("StorageApiSinkSchemaChangeIT_UpdateSchema"); + } + + @Teardown + public void tearDown() { + return; + } + + @ProcessElement + public void processElement(ProcessContext c, @StateId(MY_COUNTER) ValueState counter) + throws InterruptedException { + int current = firstNonNull(counter.read(), 0); + if (current == 0) { + int rowCount = 0; + try { + rowCount = this.getRowCount(this.projectId + "." + this.datasetId + "." + this.tableId); + } catch (Exception e) { + LOG.error(e.toString()); + } + LOG.info("checking # of rows in BQ: {}", rowCount); + if (rowCount > 0) { + BQ_CLIENT.updateTableSchema(this.projectId, this.datasetId, this.tableId, + BigQueryHelpers.fromJsonString(this.schemaString, TableSchema.class)); + + Thread.sleep(5000); + counter.write(1); + } + } + + c.output(Objects.requireNonNull(c.element()).getValue()); + } + } + + static class GenerateRowFunc implements SerializableFunction { + + private final List fieldNames; + + public GenerateRowFunc(List fieldNames) { + this.fieldNames = fieldNames; + } + + @Override + public TableRow apply(Long rowId) { + LOG.info("Generating row #{}", rowId); + TableRow row = new TableRow(); + row.set("id", rowId); + + for (String name : fieldNames) { + String type = Iterables.get(Splitter.on('_').split(name), 0); + switch (type) { + case "BOOL": + case "BOOLEAN": + if (rowId % 2 == 0) { + row.set(name, false); + } else { + row.set(name, true); + } + break; + case "BYTES": + row.set(name, "test_blob".getBytes(StandardCharsets.UTF_8)); + break; + case "INT64": + case "INTEGER": + row.set(name, rowId + 10); + break; + case "FLOAT": + case "FLOAT64": + row.set(name, 0.5 + rowId); + break; + case "NUMERIC": + row.set(name, rowId + 0.12345); + break; + case "DATE": + row.set(name, "2022-01-01"); + break; + case "TIMESTAMP": + row.set(name, "2022-01-01T10:10:10.012Z"); + break; + case "STRING": + row.set(name, "test_string" + rowId); + break; + default: + row.set(name, "unknown" + rowId); + break; + } + } + return row; + } + } + + private static TableSchema makeTableSchemaFromTypes(List fieldNames, + Set nullableFieldNames) { + ImmutableList.Builder builder = ImmutableList.builder(); + + // Add an id field for verification of correctness + builder.add(new TableFieldSchema().setType("INTEGER").setName("id").setMode("REQUIRED")); + + // the name is prefix with type_. + for (String name : fieldNames) { + String type = Iterables.get(Splitter.on('_').split(name), 0); + String mode = "REQUIRED"; + if (nullableFieldNames != null && nullableFieldNames.contains(name)) { + mode = "NULLABLE"; + } + builder.add(new TableFieldSchema().setType(type).setName(name).setMode(mode)); + } + + return new TableSchema().setFields(builder.build()); + } + + private static void runStreamingPipelineWithSchemaChange(Write.Method method, + boolean useAutoSchemaUpdate, int triggeringFreq, boolean useAutoSharding, int numShards, + boolean useIgnoreUnknownValues) throws IOException, InterruptedException { + PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); + BigQueryOptions bqOptions = pipelineOptions.as(BigQueryOptions.class); + bqOptions.setStorageApiAppendThresholdRecordCount(1); + + Pipeline p = Pipeline.create(pipelineOptions); + + List fieldNamesOrigin = new ArrayList(Arrays.asList(FIELDS)); + + // Shuffle the fields in the write schema to do fuzz testing on field order + List fieldNamesShuffled = new ArrayList(fieldNamesOrigin); + Collections.shuffle(fieldNamesShuffled, new Random(RANDOM_SEED)); + + // The updated schema includes all fields in the original schema plus a random new field + List fieldNamesWithExtra = new ArrayList(fieldNamesOrigin); + Random r = new Random(RANDOM_SEED); + String extraField = fieldNamesOrigin.get(r.nextInt(fieldNamesOrigin.size())) + "_EXTRA"; + fieldNamesWithExtra.add(extraField); + + TableSchema bqTableSchema = makeTableSchemaFromTypes(fieldNamesOrigin, null); + LOG.info("original table schema: {}", BigQueryHelpers.toJsonString(bqTableSchema)); + + TableSchema bqWriteSchema = makeTableSchemaFromTypes(fieldNamesShuffled, null); + LOG.info("write schema: {}", BigQueryHelpers.toJsonString(bqWriteSchema)); + + TableSchema bqTableSchemaUpdated = makeTableSchemaFromTypes(fieldNamesWithExtra, + ImmutableSet.of(extraField)); + LOG.info("updated table schema: {}", BigQueryHelpers.toJsonString(bqTableSchemaUpdated)); + + String tableId = createTable(bqTableSchema); + + String tableSpec = PROJECT + "." + BIG_QUERY_DATASET_ID + "." + tableId; + + TestStream.Builder testStream = TestStream.create(VarLongCoder.of()) + .advanceWatermarkTo(new Instant(0)); + // These rows contain unknown fields, which should be dropped. + for (long i = 0; i < 5; i++) { + testStream = testStream.addElements(i); + testStream = testStream.advanceProcessingTime(Duration.standardSeconds(10)); + } + // Expire the timer, which should update the schema. + testStream = testStream.advanceProcessingTime(Duration.standardSeconds(100)); + // Add one element to trigger discovery of new schema. + for (long i = 5; i < MAX_N; i++) { + testStream = testStream.addElements(i); + } + + PCollection source = p.apply("Generate numbers", testStream.advanceWatermarkToInfinity()); + + PCollection input = source.apply("Add a dummy key", WithKeys.of(1)).apply("Update Schema", + ParDo.of( + new UpdateSchemaDoFn(PROJECT, BIG_QUERY_DATASET_ID, tableId, bqTableSchemaUpdated))); + + Write write = BigQueryIO.write().to(tableSpec) + .withAutoSchemaUpdate(useAutoSchemaUpdate).withSchema(bqWriteSchema) + .withFormatFunction(new GenerateRowFunc(fieldNamesOrigin)).withMethod(method) + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withWriteDisposition(WriteDisposition.WRITE_APPEND); + + if (useAutoSharding) { + write = write.withAutoSharding(); + } + + if (useIgnoreUnknownValues) { + write = write.ignoreUnknownValues(); + } + + if (triggeringFreq > 0) { + write = write.withTriggeringFrequency(Duration.standardSeconds(triggeringFreq)); + } + + if (numShards > 0) { + write = write.withNumStorageWriteApiStreams(numShards); + } + + input.apply("Stream to BigQuery", write); + + LOG.info("Start to run the pipeline ..."); + p.run().waitUntilFinish(); + + assertTrue(checkRowCompleteness(tableSpec)); + + if (method == Write.Method.STORAGE_WRITE_API) { + assertTrue(checkRowDuplication(tableSpec)); + } + } + + @Test + public void testWriteExactlyOnceOnSchemaChange() + throws IOException, InterruptedException { + runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_WRITE_API, false, 1, true, 0, false); + } + + @Test + public void testWriteExactlyOnceOnSchemaChangeWithAutoSchemaUpdate() + throws IOException, InterruptedException { + runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_WRITE_API, true, 1, true, 0, true); + } + + @Test + public void testWriteAtLeastOnceOnSchemaChange() throws IOException, InterruptedException { + runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, false, 0, + false); + } + + @Test + public void testWriteAtLeastOnceOnSchemaChangeWithAutoSchemaUpdate() + throws IOException, InterruptedException { + runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, true, 0, false, 0, + true); + } + + @Test + public void testExceptionOnWriteExactlyOnceWithZeroTriggeringFreq() + throws IOException, InterruptedException { + assertThrows(IllegalArgumentException.class, + () -> runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_WRITE_API, false, 0, true, + 0, false)); + } + + @Test + public void testExceptionOnWriteExactlyOnceWithBothNumShardsAndAutoSharding() + throws IOException, InterruptedException { + assertThrows(IllegalArgumentException.class, + () -> runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_WRITE_API, false, 1, true, + 1, false)); + } + + @Test + public void testExceptionOnAutoSchemaUpdateWithoutIgnoreUnknownValues() + throws IOException, InterruptedException { + assertThrows(IllegalArgumentException.class, + () -> runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_WRITE_API, true, 1, + true, 0, + false)); + } + + @Test + public void testExceptionOnWriteAtLeastOnceWithTriggeringFreq() + throws IOException, InterruptedException { + assertThrows(IllegalArgumentException.class, + () -> runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 1, + true, 0, + false)); + } + + @Test + public void testExceptionOnWriteAtLeastOnceWithAutoSharding() + throws IOException, InterruptedException { + assertThrows(IllegalArgumentException.class, + () -> runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, + true, 0, + false)); + } + + @Test + public void testExceptionOnWriteAtLeastOnceWithNumShards() + throws IOException, InterruptedException { + assertThrows(IllegalArgumentException.class, + () -> runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, + false, 1, + false)); + } +} From 1def740bda923b5c7e33cfe26ca5ca00998f9af8 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 22 Jun 2023 00:38:04 -0400 Subject: [PATCH 2/5] Change some argument checking to log a warning message if the check fails. --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 68 +++++++++---------- .../StorageApiSinkSchemaChangeIT.java | 26 ++++--- 2 files changed, 49 insertions(+), 45 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 351bc1bc5823..35177aac4c15 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -3141,34 +3141,32 @@ public WriteResult expand(PCollection input) { "When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, " + "triggering frequency must be specified"); } else { - BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); - Duration triggeringFrequency = - (method == Write.Method.STORAGE_WRITE_API) - ? getStorageApiTriggeringFrequency(bqOptions) - : getTriggeringFrequency(); - - checkArgument(triggeringFrequency == null, - "Triggering frequency or number of file shards can be specified only when writing an" - + " unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, but: the collection" - + " was %s and the method was %s", - input.isBounded(), - method); + checkArgument(getTriggeringFrequency() == null, + "Triggering frequency can be specified only when writing an unbounded PCollection via" + + " FILE_LOADS or STORAGE_API_WRITES, but the collection was %s and the method was" + + " %s.", input.isBounded(), method); + + if (method == Method.STORAGE_WRITE_API) { + BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); + if (getStorageApiTriggeringFrequency(bqOptions) != null) { + LOG.warn("The setting of storageApiTriggeringFrequency in BigQueryOptions is ignored." + + " It is only supported when writing an unbounded PCollection via" + + " STORAGE_API_WRITES, but the collection was {} and the method was {}.", + input.isBounded(), method); + } + } checkArgument( (getNumFileShards() == 0), - "Number of file shards can be specified only when writing an" - + " unbounded PCollection via FILE_LOADS, but: the collection" - + " was %s and the method was %s", - input.isBounded(), + "Number of file shards can be specified only when writing an unbounded PCollection via" + + " FILE_LOADS, but the collection was %s and the method was %s", input.isBounded(), method); - checkArgument( - (getNumStorageWriteApiStreams() == 0), - "Number of storage write api streams can be specified only when writing an" - + " unbounded PCollection via STORAGE_API_WRITES, but: the collection" - + " was %s and the method was %s", - input.isBounded(), - method); + if (getNumStorageWriteApiStreams() != 0) { + LOG.warn("The setting of numStorageWriteApiStreams is ignored. It can be specified only" + + " when writing an unbounded PCollection via STORAGE_API_WRITES, but the collection" + + " was {} and the method was {}.", input.isBounded(), method); + } } if (method != Method.STORAGE_WRITE_API && method != Method.STORAGE_API_AT_LEAST_ONCE) { @@ -3193,18 +3191,20 @@ public WriteResult expand(PCollection input) { !getUseBeamSchema(), "Auto schema update not supported when using Beam schemas."); } - if (method == Method.STORAGE_WRITE_API && input.isBounded() == IsBounded.UNBOUNDED) { - if (getNumStorageWriteApiStreams() > 0) { - checkArgument(!getAutoSharding(), "withAutoSharding only supported when" - + " numStorageWriteApiStream is zero or not set. Currently it is set to %s.", - getNumStorageWriteApiStreams()); - } + if (input.isBounded() == IsBounded.BOUNDED) { + checkArgument(!getAutoSharding(), "Auto-sharding is only applicable to unbounded input."); } else { - checkArgument(!getAutoSharding(), "withAutoSharding only supported" - + " when writing an unbounded PCollection via STORAGE_API_WRITES. but: the collection" - + " was %s and the method was %s", - input.isBounded(), - method); + if (method == Method.STORAGE_WRITE_API) { + if (getNumStorageWriteApiStreams() > 0 && getAutoSharding()) { + LOG.warn("The setting of auto-sharding is ignored. It is only supported when" + + " numStorageWriteApiStream is zero or not set, but it was set to {}.", + getNumStorageWriteApiStreams()); + } + } else if (method != Method.STREAMING_INSERTS) { + LOG.warn("The setting of auto-sharding is ignored. It is only supported when writing an" + + " unbounded PCollection via STREAMING_INSERTS or STORAGE_API_WRITES, but the" + + " collection was {} and the method was {}.", input.isBounded(), method); + } } if (getJsonTimePartitioning() != null) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java index 5f76f524bc34..714e494c33a0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -47,6 +48,7 @@ import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.DoFn; @@ -63,6 +65,7 @@ import org.joda.time.Instant; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,6 +88,9 @@ public class StorageApiSinkSchemaChangeIT { private static final long RANDOM_SEED = 1; + @Rule + public transient ExpectedLogs loggedBigQueryIO = ExpectedLogs.none(BigQueryIO.class); + @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. @@ -407,9 +413,9 @@ public void testExceptionOnWriteExactlyOnceWithZeroTriggeringFreq() @Test public void testExceptionOnWriteExactlyOnceWithBothNumShardsAndAutoSharding() throws IOException, InterruptedException { - assertThrows(IllegalArgumentException.class, - () -> runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_WRITE_API, false, 1, true, - 1, false)); + runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_WRITE_API, false, 1, true, + 1, false); + loggedBigQueryIO.verifyWarn("The setting of auto-sharding is ignored."); } @Test @@ -433,18 +439,16 @@ public void testExceptionOnWriteAtLeastOnceWithTriggeringFreq() @Test public void testExceptionOnWriteAtLeastOnceWithAutoSharding() throws IOException, InterruptedException { - assertThrows(IllegalArgumentException.class, - () -> runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, - true, 0, - false)); + runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, true, 0, + false); + loggedBigQueryIO.verifyWarn("The setting of auto-sharding is ignored."); } @Test public void testExceptionOnWriteAtLeastOnceWithNumShards() throws IOException, InterruptedException { - assertThrows(IllegalArgumentException.class, - () -> runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, - false, 1, - false)); + runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, false, 1, + false); + loggedBigQueryIO.verifyWarn("The setting of numStorageWriteApiStreams is ignored."); } } From 15e909d0cfac7ade8e6b832da483aee46b9c3c82 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 22 Jun 2023 00:56:34 -0400 Subject: [PATCH 3/5] Fix the format after spotlessCheck and checkStyle. --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 41 ++-- .../sdk/io/gcp/testing/BigqueryClient.java | 11 +- .../StorageApiSinkSchemaChangeIT.java | 181 +++++++++++------- 3 files changed, 145 insertions(+), 88 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 35177aac4c15..93c643bdc463 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -3141,31 +3141,40 @@ public WriteResult expand(PCollection input) { "When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, " + "triggering frequency must be specified"); } else { - checkArgument(getTriggeringFrequency() == null, + checkArgument( + getTriggeringFrequency() == null, "Triggering frequency can be specified only when writing an unbounded PCollection via" + " FILE_LOADS or STORAGE_API_WRITES, but the collection was %s and the method was" - + " %s.", input.isBounded(), method); + + " %s.", + input.isBounded(), + method); if (method == Method.STORAGE_WRITE_API) { BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); if (getStorageApiTriggeringFrequency(bqOptions) != null) { - LOG.warn("The setting of storageApiTriggeringFrequency in BigQueryOptions is ignored." - + " It is only supported when writing an unbounded PCollection via" - + " STORAGE_API_WRITES, but the collection was {} and the method was {}.", - input.isBounded(), method); + LOG.warn( + "The setting of storageApiTriggeringFrequency in BigQueryOptions is ignored." + + " It is only supported when writing an unbounded PCollection via" + + " STORAGE_API_WRITES, but the collection was {} and the method was {}.", + input.isBounded(), + method); } } checkArgument( (getNumFileShards() == 0), "Number of file shards can be specified only when writing an unbounded PCollection via" - + " FILE_LOADS, but the collection was %s and the method was %s", input.isBounded(), + + " FILE_LOADS, but the collection was %s and the method was %s", + input.isBounded(), method); if (getNumStorageWriteApiStreams() != 0) { - LOG.warn("The setting of numStorageWriteApiStreams is ignored. It can be specified only" - + " when writing an unbounded PCollection via STORAGE_API_WRITES, but the collection" - + " was {} and the method was {}.", input.isBounded(), method); + LOG.warn( + "The setting of numStorageWriteApiStreams is ignored. It can be specified only" + + " when writing an unbounded PCollection via STORAGE_API_WRITES, but the collection" + + " was {} and the method was {}.", + input.isBounded(), + method); } } @@ -3196,14 +3205,18 @@ public WriteResult expand(PCollection input) { } else { if (method == Method.STORAGE_WRITE_API) { if (getNumStorageWriteApiStreams() > 0 && getAutoSharding()) { - LOG.warn("The setting of auto-sharding is ignored. It is only supported when" + LOG.warn( + "The setting of auto-sharding is ignored. It is only supported when" + " numStorageWriteApiStream is zero or not set, but it was set to {}.", getNumStorageWriteApiStreams()); } } else if (method != Method.STREAMING_INSERTS) { - LOG.warn("The setting of auto-sharding is ignored. It is only supported when writing an" - + " unbounded PCollection via STREAMING_INSERTS or STORAGE_API_WRITES, but the" - + " collection was {} and the method was {}.", input.isBounded(), method); + LOG.warn( + "The setting of auto-sharding is ignored. It is only supported when writing an" + + " unbounded PCollection via STREAMING_INSERTS or STORAGE_API_WRITES, but the" + + " collection was {} and the method was {}.", + input.isBounded(), + method); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java index ab8b59265f0b..82061be153b9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java @@ -571,12 +571,13 @@ public Table getTableResource(String projectId, String datasetId, String tableId lastException); } - public void updateTableSchema(String projectId, String datasetId, String tableId, - TableSchema newSchema) { + public void updateTableSchema( + String projectId, String datasetId, String tableId, TableSchema newSchema) { try { - this.bqClient.tables() - .patch(projectId, datasetId, tableId, - new Table().setSchema(newSchema)).execute(); + this.bqClient + .tables() + .patch(projectId, datasetId, tableId, new Table().setSchema(newSchema)) + .execute(); LOG.info("Successfully updated the schema of table: " + tableId); } catch (Exception e) { LOG.debug("Exceptions caught when updating table schema: " + e.getMessage()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java index 714e494c33a0..2941641a284b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -74,22 +73,32 @@ public class StorageApiSinkSchemaChangeIT { private static final Logger LOG = LoggerFactory.getLogger(StorageApiSinkSchemaChangeIT.class); - private static final BigqueryClient BQ_CLIENT = new BigqueryClient( - "StorageApiSinkSchemaChangeIT"); - private static final String PROJECT = TestPipeline.testingPipelineOptions().as(GcpOptions.class) - .getProject(); + private static final BigqueryClient BQ_CLIENT = + new BigqueryClient("StorageApiSinkSchemaChangeIT"); + private static final String PROJECT = + TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); private static final String BIG_QUERY_DATASET_ID = "storage_api_sink_schema_change" + System.nanoTime(); - private static final String[] FIELDS = {"BOOL", "BOOLEAN", "BYTES", "INT64", "INTEGER", "FLOAT", - "FLOAT64", "NUMERIC", "STRING", "DATE", "TIMESTAMP"}; + private static final String[] FIELDS = { + "BOOL", + "BOOLEAN", + "BYTES", + "INT64", + "INTEGER", + "FLOAT", + "FLOAT64", + "NUMERIC", + "STRING", + "DATE", + "TIMESTAMP" + }; private static final int MAX_N = 50; private static final long RANDOM_SEED = 1; - @Rule - public transient ExpectedLogs loggedBigQueryIO = ExpectedLogs.none(BigQueryIO.class); + @Rule public transient ExpectedLogs loggedBigQueryIO = ExpectedLogs.none(BigQueryIO.class); @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { @@ -107,18 +116,28 @@ private static String createTable(TableSchema tableSchema) throws IOException, InterruptedException { String tableId = "table" + System.nanoTime(); BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, tableId); - BQ_CLIENT.createNewTable(PROJECT, BIG_QUERY_DATASET_ID, new Table().setSchema(tableSchema) - .setTableReference( - new TableReference().setTableId(tableId).setDatasetId(BIG_QUERY_DATASET_ID) - .setProjectId(PROJECT))); + BQ_CLIENT.createNewTable( + PROJECT, + BIG_QUERY_DATASET_ID, + new Table() + .setSchema(tableSchema) + .setTableReference( + new TableReference() + .setTableId(tableId) + .setDatasetId(BIG_QUERY_DATASET_ID) + .setProjectId(PROJECT))); return tableId; } private static boolean checkRowCompleteness(String tableSpec) throws IOException, InterruptedException { - TableRow queryResponse = Iterables.getOnlyElement(BQ_CLIENT.queryUnflattened( - String.format("SELECT COUNT(DISTINCT(id)), MIN(id), MAX(id) FROM %s", tableSpec), PROJECT, - true, true)); + TableRow queryResponse = + Iterables.getOnlyElement( + BQ_CLIENT.queryUnflattened( + String.format("SELECT COUNT(DISTINCT(id)), MIN(id), MAX(id) FROM %s", tableSpec), + PROJECT, + true, + true)); int distinctCount = Integer.parseInt((String) queryResponse.get("f0_")); int rangeMin = Integer.parseInt((String) queryResponse.get("f1_")); @@ -131,9 +150,13 @@ private static boolean checkRowCompleteness(String tableSpec) private static boolean checkRowDuplication(String tableSpec) throws IOException, InterruptedException { - TableRow queryResponse = Iterables.getOnlyElement(BQ_CLIENT.queryUnflattened( - String.format("SELECT COUNT(DISTINCT(id)), COUNT(id) FROM %s", tableSpec), PROJECT, true, - true)); + TableRow queryResponse = + Iterables.getOnlyElement( + BQ_CLIENT.queryUnflattened( + String.format("SELECT COUNT(DISTINCT(id)), COUNT(id) FROM %s", tableSpec), + PROJECT, + true, + true)); int distinctCount = Integer.parseInt((String) queryResponse.get("f0_")); int totalCount = Integer.parseInt((String) queryResponse.get("f1_")); @@ -151,7 +174,7 @@ static class UpdateSchemaDoFn extends DoFn, Long> { private final String schemaString; - private transient BigqueryClient BQ_CLIENT; + private transient BigqueryClient bqClient; private static final String MY_COUNTER = "myCounter"; @@ -159,26 +182,27 @@ static class UpdateSchemaDoFn extends DoFn, Long> { @SuppressWarnings("unused") private final StateSpec<@org.jetbrains.annotations.NotNull ValueState> counter; - public UpdateSchemaDoFn(String projectId, String datasetId, String tableId, - TableSchema schema) { + public UpdateSchemaDoFn( + String projectId, String datasetId, String tableId, TableSchema schema) { this.projectId = projectId; this.datasetId = datasetId; this.tableId = tableId; this.schemaString = BigQueryHelpers.toJsonString(schema); - this.BQ_CLIENT = null; + this.bqClient = null; this.counter = StateSpecs.value(); } private int getRowCount(String tableSpec) throws IOException, InterruptedException { - TableRow queryResponse = Iterables.getOnlyElement( - BQ_CLIENT.queryUnflattened(String.format("SELECT COUNT(*) FROM %s", tableSpec), PROJECT, - true, true)); + TableRow queryResponse = + Iterables.getOnlyElement( + bqClient.queryUnflattened( + String.format("SELECT COUNT(*) FROM %s", tableSpec), PROJECT, true, true)); return Integer.parseInt((String) queryResponse.get("f0_")); } @Setup public void setup() { - BQ_CLIENT = new BigqueryClient("StorageApiSinkSchemaChangeIT_UpdateSchema"); + bqClient = new BigqueryClient("StorageApiSinkSchemaChangeIT_UpdateSchema"); } @Teardown @@ -199,7 +223,10 @@ public void processElement(ProcessContext c, @StateId(MY_COUNTER) ValueState 0) { - BQ_CLIENT.updateTableSchema(this.projectId, this.datasetId, this.tableId, + bqClient.updateTableSchema( + this.projectId, + this.datasetId, + this.tableId, BigQueryHelpers.fromJsonString(this.schemaString, TableSchema.class)); Thread.sleep(5000); @@ -268,8 +295,8 @@ public TableRow apply(Long rowId) { } } - private static TableSchema makeTableSchemaFromTypes(List fieldNames, - Set nullableFieldNames) { + private static TableSchema makeTableSchemaFromTypes( + List fieldNames, Set nullableFieldNames) { ImmutableList.Builder builder = ImmutableList.builder(); // Add an id field for verification of correctness @@ -288,9 +315,14 @@ private static TableSchema makeTableSchemaFromTypes(List fieldNames, return new TableSchema().setFields(builder.build()); } - private static void runStreamingPipelineWithSchemaChange(Write.Method method, - boolean useAutoSchemaUpdate, int triggeringFreq, boolean useAutoSharding, int numShards, - boolean useIgnoreUnknownValues) throws IOException, InterruptedException { + private static void runStreamingPipelineWithSchemaChange( + Write.Method method, + boolean useAutoSchemaUpdate, + int triggeringFreq, + boolean useAutoSharding, + int numShards, + boolean useIgnoreUnknownValues) + throws IOException, InterruptedException { PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); BigQueryOptions bqOptions = pipelineOptions.as(BigQueryOptions.class); bqOptions.setStorageApiAppendThresholdRecordCount(1); @@ -315,16 +347,16 @@ private static void runStreamingPipelineWithSchemaChange(Write.Method method, TableSchema bqWriteSchema = makeTableSchemaFromTypes(fieldNamesShuffled, null); LOG.info("write schema: {}", BigQueryHelpers.toJsonString(bqWriteSchema)); - TableSchema bqTableSchemaUpdated = makeTableSchemaFromTypes(fieldNamesWithExtra, - ImmutableSet.of(extraField)); + TableSchema bqTableSchemaUpdated = + makeTableSchemaFromTypes(fieldNamesWithExtra, ImmutableSet.of(extraField)); LOG.info("updated table schema: {}", BigQueryHelpers.toJsonString(bqTableSchemaUpdated)); String tableId = createTable(bqTableSchema); String tableSpec = PROJECT + "." + BIG_QUERY_DATASET_ID + "." + tableId; - TestStream.Builder testStream = TestStream.create(VarLongCoder.of()) - .advanceWatermarkTo(new Instant(0)); + TestStream.Builder testStream = + TestStream.create(VarLongCoder.of()).advanceWatermarkTo(new Instant(0)); // These rows contain unknown fields, which should be dropped. for (long i = 0; i < 5; i++) { testStream = testStream.addElements(i); @@ -339,15 +371,24 @@ private static void runStreamingPipelineWithSchemaChange(Write.Method method, PCollection source = p.apply("Generate numbers", testStream.advanceWatermarkToInfinity()); - PCollection input = source.apply("Add a dummy key", WithKeys.of(1)).apply("Update Schema", - ParDo.of( - new UpdateSchemaDoFn(PROJECT, BIG_QUERY_DATASET_ID, tableId, bqTableSchemaUpdated))); - - Write write = BigQueryIO.write().to(tableSpec) - .withAutoSchemaUpdate(useAutoSchemaUpdate).withSchema(bqWriteSchema) - .withFormatFunction(new GenerateRowFunc(fieldNamesOrigin)).withMethod(method) - .withCreateDisposition(CreateDisposition.CREATE_NEVER) - .withWriteDisposition(WriteDisposition.WRITE_APPEND); + PCollection input = + source + .apply("Add a dummy key", WithKeys.of(1)) + .apply( + "Update Schema", + ParDo.of( + new UpdateSchemaDoFn( + PROJECT, BIG_QUERY_DATASET_ID, tableId, bqTableSchemaUpdated))); + + Write write = + BigQueryIO.write() + .to(tableSpec) + .withAutoSchemaUpdate(useAutoSchemaUpdate) + .withSchema(bqWriteSchema) + .withFormatFunction(new GenerateRowFunc(fieldNamesOrigin)) + .withMethod(method) + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withWriteDisposition(WriteDisposition.WRITE_APPEND); if (useAutoSharding) { write = write.withAutoSharding(); @@ -378,8 +419,7 @@ private static void runStreamingPipelineWithSchemaChange(Write.Method method, } @Test - public void testWriteExactlyOnceOnSchemaChange() - throws IOException, InterruptedException { + public void testWriteExactlyOnceOnSchemaChange() throws IOException, InterruptedException { runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_WRITE_API, false, 1, true, 0, false); } @@ -391,64 +431,67 @@ public void testWriteExactlyOnceOnSchemaChangeWithAutoSchemaUpdate() @Test public void testWriteAtLeastOnceOnSchemaChange() throws IOException, InterruptedException { - runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, false, 0, - false); + runStreamingPipelineWithSchemaChange( + Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, false, 0, false); } @Test public void testWriteAtLeastOnceOnSchemaChangeWithAutoSchemaUpdate() throws IOException, InterruptedException { - runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, true, 0, false, 0, - true); + runStreamingPipelineWithSchemaChange( + Write.Method.STORAGE_API_AT_LEAST_ONCE, true, 0, false, 0, true); } @Test public void testExceptionOnWriteExactlyOnceWithZeroTriggeringFreq() throws IOException, InterruptedException { - assertThrows(IllegalArgumentException.class, - () -> runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_WRITE_API, false, 0, true, - 0, false)); + assertThrows( + IllegalArgumentException.class, + () -> + runStreamingPipelineWithSchemaChange( + Write.Method.STORAGE_WRITE_API, false, 0, true, 0, false)); } @Test public void testExceptionOnWriteExactlyOnceWithBothNumShardsAndAutoSharding() throws IOException, InterruptedException { - runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_WRITE_API, false, 1, true, - 1, false); + runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_WRITE_API, false, 1, true, 1, false); loggedBigQueryIO.verifyWarn("The setting of auto-sharding is ignored."); } @Test public void testExceptionOnAutoSchemaUpdateWithoutIgnoreUnknownValues() throws IOException, InterruptedException { - assertThrows(IllegalArgumentException.class, - () -> runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_WRITE_API, true, 1, - true, 0, - false)); + assertThrows( + IllegalArgumentException.class, + () -> + runStreamingPipelineWithSchemaChange( + Write.Method.STORAGE_WRITE_API, true, 1, true, 0, false)); } @Test public void testExceptionOnWriteAtLeastOnceWithTriggeringFreq() throws IOException, InterruptedException { - assertThrows(IllegalArgumentException.class, - () -> runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 1, - true, 0, - false)); + assertThrows( + IllegalArgumentException.class, + () -> + runStreamingPipelineWithSchemaChange( + Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 1, true, 0, false)); } @Test public void testExceptionOnWriteAtLeastOnceWithAutoSharding() throws IOException, InterruptedException { - runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, true, 0, - false); + runStreamingPipelineWithSchemaChange( + Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, true, 0, false); loggedBigQueryIO.verifyWarn("The setting of auto-sharding is ignored."); } @Test public void testExceptionOnWriteAtLeastOnceWithNumShards() throws IOException, InterruptedException { - runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, false, 1, - false); + runStreamingPipelineWithSchemaChange( + Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, false, 1, false); loggedBigQueryIO.verifyWarn("The setting of numStorageWriteApiStreams is ignored."); } } From 1d44573c3199158ae9655788222785d2b1317031 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 22 Jun 2023 11:01:49 -0400 Subject: [PATCH 4/5] Parameterize tests with useWriteSchema option. Internally, we will decide whether to call withSchema() with a schema of shuffled fields based on this option. --- .../StorageApiSinkSchemaChangeIT.java | 38 ++++++++++++++----- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java index 2941641a284b..e29022bae51a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java @@ -66,10 +66,20 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) public class StorageApiSinkSchemaChangeIT { + @Parameterized.Parameters + public static Iterable data() { + return ImmutableList.of(new Object[] {true}, new Object[] {false}); + } + + @Parameterized.Parameter(0) + public boolean useWriteSchema; private static final Logger LOG = LoggerFactory.getLogger(StorageApiSinkSchemaChangeIT.class); @@ -317,6 +327,7 @@ private static TableSchema makeTableSchemaFromTypes( private static void runStreamingPipelineWithSchemaChange( Write.Method method, + boolean useWriteSchema, boolean useAutoSchemaUpdate, int triggeringFreq, boolean useAutoSharding, @@ -390,6 +401,10 @@ private static void runStreamingPipelineWithSchemaChange( .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withWriteDisposition(WriteDisposition.WRITE_APPEND); + if (useWriteSchema) { + write = write.withSchema(bqWriteSchema); + } + if (useAutoSharding) { write = write.withAutoSharding(); } @@ -420,26 +435,28 @@ private static void runStreamingPipelineWithSchemaChange( @Test public void testWriteExactlyOnceOnSchemaChange() throws IOException, InterruptedException { - runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_WRITE_API, false, 1, true, 0, false); + runStreamingPipelineWithSchemaChange( + Write.Method.STORAGE_WRITE_API, useWriteSchema, false, 1, true, 0, false); } @Test public void testWriteExactlyOnceOnSchemaChangeWithAutoSchemaUpdate() throws IOException, InterruptedException { - runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_WRITE_API, true, 1, true, 0, true); + runStreamingPipelineWithSchemaChange( + Write.Method.STORAGE_WRITE_API, useWriteSchema, true, 1, true, 0, true); } @Test public void testWriteAtLeastOnceOnSchemaChange() throws IOException, InterruptedException { runStreamingPipelineWithSchemaChange( - Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, false, 0, false); + Write.Method.STORAGE_API_AT_LEAST_ONCE, useWriteSchema, false, 0, false, 0, false); } @Test public void testWriteAtLeastOnceOnSchemaChangeWithAutoSchemaUpdate() throws IOException, InterruptedException { runStreamingPipelineWithSchemaChange( - Write.Method.STORAGE_API_AT_LEAST_ONCE, true, 0, false, 0, true); + Write.Method.STORAGE_API_AT_LEAST_ONCE, useWriteSchema, true, 0, false, 0, true); } @Test @@ -449,13 +466,14 @@ public void testExceptionOnWriteExactlyOnceWithZeroTriggeringFreq() IllegalArgumentException.class, () -> runStreamingPipelineWithSchemaChange( - Write.Method.STORAGE_WRITE_API, false, 0, true, 0, false)); + Write.Method.STORAGE_WRITE_API, useWriteSchema, false, 0, true, 0, false)); } @Test public void testExceptionOnWriteExactlyOnceWithBothNumShardsAndAutoSharding() throws IOException, InterruptedException { - runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_WRITE_API, false, 1, true, 1, false); + runStreamingPipelineWithSchemaChange( + Write.Method.STORAGE_WRITE_API, useWriteSchema, false, 1, true, 1, false); loggedBigQueryIO.verifyWarn("The setting of auto-sharding is ignored."); } @@ -466,7 +484,7 @@ public void testExceptionOnAutoSchemaUpdateWithoutIgnoreUnknownValues() IllegalArgumentException.class, () -> runStreamingPipelineWithSchemaChange( - Write.Method.STORAGE_WRITE_API, true, 1, true, 0, false)); + Write.Method.STORAGE_WRITE_API, useWriteSchema, true, 1, true, 0, false)); } @Test @@ -476,14 +494,14 @@ public void testExceptionOnWriteAtLeastOnceWithTriggeringFreq() IllegalArgumentException.class, () -> runStreamingPipelineWithSchemaChange( - Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 1, true, 0, false)); + Write.Method.STORAGE_API_AT_LEAST_ONCE, useWriteSchema, false, 1, true, 0, false)); } @Test public void testExceptionOnWriteAtLeastOnceWithAutoSharding() throws IOException, InterruptedException { runStreamingPipelineWithSchemaChange( - Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, true, 0, false); + Write.Method.STORAGE_API_AT_LEAST_ONCE, useWriteSchema, false, 0, true, 0, false); loggedBigQueryIO.verifyWarn("The setting of auto-sharding is ignored."); } @@ -491,7 +509,7 @@ public void testExceptionOnWriteAtLeastOnceWithAutoSharding() public void testExceptionOnWriteAtLeastOnceWithNumShards() throws IOException, InterruptedException { runStreamingPipelineWithSchemaChange( - Write.Method.STORAGE_API_AT_LEAST_ONCE, false, 0, false, 1, false); + Write.Method.STORAGE_API_AT_LEAST_ONCE, useWriteSchema, false, 0, false, 1, false); loggedBigQueryIO.verifyWarn("The setting of numStorageWriteApiStreams is ignored."); } } From bc144d04111bbd18a3a9486754953efd92d5170e Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 22 Jun 2023 14:51:42 -0400 Subject: [PATCH 5/5] Update some warning messages. * Fix a few typos on the method name STORAGE_WRITE_API * Change the warning message when both numStorageWriteApiStreams and autoSharding are set. In this case, autoSharding takes priority. * Add an argument check for using both numFileShards and autoSharding via FILE_LOADS. --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 23 +++++++++++-------- .../StorageApiSinkSchemaChangeIT.java | 2 +- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 93c643bdc463..5b8f20ce4f41 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -3138,13 +3138,13 @@ public WriteResult expand(PCollection input) { : getTriggeringFrequency(); checkArgument( triggeringFrequency != null, - "When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, " + "When writing an unbounded PCollection via FILE_LOADS or STORAGE_WRITE_API, " + "triggering frequency must be specified"); } else { checkArgument( getTriggeringFrequency() == null, "Triggering frequency can be specified only when writing an unbounded PCollection via" - + " FILE_LOADS or STORAGE_API_WRITES, but the collection was %s and the method was" + + " FILE_LOADS or STORAGE_WRITE_API, but the collection was %s and the method was" + " %s.", input.isBounded(), method); @@ -3155,7 +3155,7 @@ public WriteResult expand(PCollection input) { LOG.warn( "The setting of storageApiTriggeringFrequency in BigQueryOptions is ignored." + " It is only supported when writing an unbounded PCollection via" - + " STORAGE_API_WRITES, but the collection was {} and the method was {}.", + + " STORAGE_WRITE_API, but the collection was {} and the method was {}.", input.isBounded(), method); } @@ -3171,7 +3171,7 @@ public WriteResult expand(PCollection input) { if (getNumStorageWriteApiStreams() != 0) { LOG.warn( "The setting of numStorageWriteApiStreams is ignored. It can be specified only" - + " when writing an unbounded PCollection via STORAGE_API_WRITES, but the collection" + + " when writing an unbounded PCollection via STORAGE_WRITE_API, but the collection" + " was {} and the method was {}.", input.isBounded(), method); @@ -3206,15 +3206,20 @@ public WriteResult expand(PCollection input) { if (method == Method.STORAGE_WRITE_API) { if (getNumStorageWriteApiStreams() > 0 && getAutoSharding()) { LOG.warn( - "The setting of auto-sharding is ignored. It is only supported when" - + " numStorageWriteApiStream is zero or not set, but it was set to {}.", - getNumStorageWriteApiStreams()); + "The setting of numStorageWriteApiStreams is ignored. It is only supported when" + + " autoSharding is not enabled."); + } + } else if (method == Method.FILE_LOADS) { + if (getNumFileShards() > 0 && getAutoSharding()) { + LOG.warn( + "The setting of numFileShards is ignored. It is only supported when autoSharding is" + + " not enabled."); } } else if (method != Method.STREAMING_INSERTS) { LOG.warn( "The setting of auto-sharding is ignored. It is only supported when writing an" - + " unbounded PCollection via STREAMING_INSERTS or STORAGE_API_WRITES, but the" - + " collection was {} and the method was {}.", + + " unbounded PCollection via FILE_LOADS, STREAMING_INSERTS or" + + " STORAGE_WRITE_API, but the collection was {} and the method was {}.", input.isBounded(), method); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java index e29022bae51a..8f3434799a78 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaChangeIT.java @@ -474,7 +474,7 @@ public void testExceptionOnWriteExactlyOnceWithBothNumShardsAndAutoSharding() throws IOException, InterruptedException { runStreamingPipelineWithSchemaChange( Write.Method.STORAGE_WRITE_API, useWriteSchema, false, 1, true, 1, false); - loggedBigQueryIO.verifyWarn("The setting of auto-sharding is ignored."); + loggedBigQueryIO.verifyWarn("The setting of numStorageWriteApiStreams is ignored."); } @Test