diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index eded23f19427..f6e2b9b147c5 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -562,7 +562,6 @@ task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test, dependsOn: copyG include '**/*IT.class' exclude '**/BigQueryIOReadIT.class' exclude '**/BigQueryIOStorageReadTableRowIT.class' - exclude '**/StorageApiSinkSchemaUpdateIT.class' // IT based on test stream exclude '**/PubsubReadIT.class' exclude '**/FhirIOReadIT.class' exclude '**/gcp/spanner/changestreams/it/*.class' @@ -606,7 +605,6 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) { include '**/*IT.class' exclude '**/BigQueryIOStorageReadTableRowIT.class' - exclude '**/StorageApiSinkSchemaUpdateIT.class' // IT based on test stream exclude '**/SpannerWriteIT.class' exclude '**/*KmsKeyIT.class' exclude '**/FhirIOReadIT.class' diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java index fc836bc3e475..3679c3eb10f5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java @@ -38,6 +38,7 @@ public class PeriodicImpulse extends PTransform> { Instant stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE; Duration fireInterval = Duration.standardMinutes(1); boolean applyWindowing = false; + boolean catchUpToNow = true; private PeriodicImpulse() {} @@ -65,6 +66,16 @@ public PeriodicImpulse applyWindowing() { return this; } + /** + * The default behavior is that PeriodicImpulse emits all instants until Instant.now(), then + * starts firing at the specified interval. If this is set to false, the PeriodicImpulse will + * perform the interval wait before firing each instant. + */ + public PeriodicImpulse catchUpToNow(boolean catchUpToNow) { + this.catchUpToNow = catchUpToNow; + return this; + } + @Override public PCollection expand(PBegin input) { PCollection result = @@ -72,7 +83,7 @@ public PCollection expand(PBegin input) { .apply( Create.of( new PeriodicSequence.SequenceDefinition( - startTimestamp, stopTimestamp, fireInterval))) + startTimestamp, stopTimestamp, fireInterval, catchUpToNow))) .apply(PeriodicSequence.create()); if (this.applyWindowing) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java index 4b8e219199d1..b3cd2afde697 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java @@ -56,6 +56,7 @@ public static class SequenceDefinition { public Instant first; public Instant last; public Long durationMilliSec; + public boolean catchUpToNow; public SequenceDefinition() {} @@ -63,6 +64,15 @@ public SequenceDefinition(Instant first, Instant last, Duration duration) { this.first = first; this.last = last; this.durationMilliSec = duration.getMillis(); + this.catchUpToNow = true; + } + + public SequenceDefinition( + Instant first, Instant last, Duration duration, boolean catchUpToNow) { + this.first = first; + this.last = last; + this.durationMilliSec = duration.getMillis(); + this.catchUpToNow = catchUpToNow; } @Override @@ -223,11 +233,17 @@ public ProcessContinuation processElement( estimator.setWatermark(output); nextOutput = nextOutput + interval; } + if (!srcElement.catchUpToNow) { + break; + } } ProcessContinuation continuation = ProcessContinuation.stop(); if (claimSuccess) { - Duration offset = new Duration(Instant.now(), Instant.ofEpochMilli(nextOutput)); + Duration offset = + srcElement.catchUpToNow + ? new Duration(Instant.now(), Instant.ofEpochMilli(nextOutput)) + : new Duration(interval); continuation = ProcessContinuation.resume().withResumeDelay(offset); } return continuation; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java index 8d8fe8e811e4..6931b7ac9b98 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableFieldSchema; @@ -27,6 +28,7 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import java.io.IOException; +import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -34,25 +36,28 @@ import java.util.List; import java.util.Random; import java.util.Set; -import org.apache.beam.runners.direct.DirectOptions; +import java.util.function.Function; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.PeriodicImpulse; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; @@ -111,7 +116,14 @@ public static Iterable data() { "TIMESTAMP" }; - private static final int TOTAL_N = 35; + // ************ NOTE ************ + // The test may fail if Storage API Streams take longer than expected to recognize + // an updated schema. If that happens consistently, just increase these two numbers + // to give it more time. + // Total number of rows written to the sink + private static final int TOTAL_N = 60; + // Number of rows with the original schema + private static final int ORIGINAL_N = 50; private final Random randomGenerator = new Random(); @@ -185,10 +197,9 @@ public void setup() { public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState counter) throws Exception { int current = firstNonNull(counter.read(), 0); - Thread.sleep(1000); - // We update schema early on to leave a healthy amount of time for StreamWriter to recognize - // it. - if (current == 2) { + // We update schema early on to leave a healthy amount of time for + // StreamWriter to recognize it. + if (current == 10) { bqClient.updateTableSchema( projectId, datasetId, @@ -203,9 +214,11 @@ public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState { private final List fieldNames; + private final List fieldNamesWithExtra; - public GenerateRowFunc(List fieldNames) { + public GenerateRowFunc(List fieldNames, List fieldNamesWithExtra) { this.fieldNames = fieldNames; + this.fieldNamesWithExtra = fieldNamesWithExtra; } @Override @@ -213,7 +226,9 @@ public TableRow apply(Long rowId) { TableRow row = new TableRow(); row.set("id", rowId); - for (String name : fieldNames) { + List fields = rowId < ORIGINAL_N ? fieldNames : fieldNamesWithExtra; + + for (String name : fields) { String type = Iterables.get(Splitter.on('_').split(name), 0); switch (type) { case "BOOL": @@ -280,8 +295,21 @@ private void runStreamingPipelineWithSchemaChange( Write.Method method, boolean useAutoSchemaUpdate, boolean useIgnoreUnknownValues) throws Exception { Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions()); + // Set threshold bytes to 0 so that the stream attempts to fetch an updated schema after each + // append p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(0); - p.getOptions().as(DirectOptions.class).setTargetParallelism(1); + // Limit parallelism so that all streams recognize the new schema in an expected short amount + // of time (before we start writing rows with updated schema) + p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(3); + // Need to manually enable streaming engine for legacy dataflow runner + ExperimentalOptions.addExperiment( + p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT); + // Only run the most relevant test case on Dataflow + if (p.getOptions().getRunner().getName().contains("DataflowRunner")) { + assumeTrue( + "Skipping in favor of more relevant test case", + changeTableSchema && useInputSchema && useAutoSchemaUpdate); + } List fieldNamesOrigin = new ArrayList(Arrays.asList(FIELDS)); @@ -303,26 +331,6 @@ private void runStreamingPipelineWithSchemaChange( String tableId = createTable(bqTableSchema); String tableSpec = PROJECT + ":" + BIG_QUERY_DATASET_ID + "." + tableId; - TestStream.Builder testStream = - TestStream.create(TableRowJsonCoder.of()).advanceWatermarkTo(new Instant(0)); - - // Generate rows with original schema - int numOriginalRows = 30; - GenerateRowFunc originalSchemaFunc = new GenerateRowFunc(fieldNamesOrigin); - for (long i = 0; i < numOriginalRows; i++) { - testStream = testStream.addElements(originalSchemaFunc.apply(i)); - testStream = testStream.advanceProcessingTime(Duration.standardSeconds(5)); - } - - // Generate rows with updated schema - // These rows should only reach the table if ignoreUnknownValues is set, - // and the extra field should be present only when autoSchemaUpdate is set - GenerateRowFunc updatedSchemaFunc = new GenerateRowFunc(fieldNamesWithExtra); - for (long i = numOriginalRows; i < TOTAL_N; i++) { - testStream = testStream.addElements(updatedSchemaFunc.apply(i)); - testStream = testStream.advanceProcessingTime(Duration.standardSeconds(5)); - } - // build write transform Write write = BigQueryIO.writeTableRows() @@ -341,8 +349,39 @@ private void runStreamingPipelineWithSchemaChange( write = write.ignoreUnknownValues(); } - // build pipeline - PCollection rows = p.apply("Generate rows", testStream.advanceWatermarkToInfinity()); + // set up and build pipeline + Instant start = new Instant(0); + // We give a healthy waiting period between each element to give Storage API streams a chance to + // recognize the new schema. Apply on relevant tests. + boolean waitLonger = changeTableSchema && (useAutoSchemaUpdate || !useInputSchema); + Duration interval = waitLonger ? Duration.standardSeconds(1) : Duration.millis(1); + Duration stop = + waitLonger ? Duration.standardSeconds(TOTAL_N - 1) : Duration.millis(TOTAL_N - 1); + Function getIdFromInstant = + waitLonger + ? (Function & Serializable) + (Instant instant) -> instant.getMillis() / 1000 + : (Function & Serializable) (Instant instant) -> instant.getMillis(); + + // Generates rows with original schema up for row IDs under ORIGINAL_N + // Then generates rows with updated schema for the rest + // Rows with updated schema should only reach the table if ignoreUnknownValues is set, + // and the extra field should be present only when autoSchemaUpdate is set + GenerateRowFunc generateRowFunc = new GenerateRowFunc(fieldNamesOrigin, fieldNamesWithExtra); + PCollection instants = + p.apply( + "Generate Instants", + PeriodicImpulse.create() + .startAt(start) + .stopAt(start.plus(stop)) + .withInterval(interval) + .catchUpToNow(false)); + PCollection rows = + instants.apply( + "Create TableRows", + MapElements.into(TypeDescriptor.of(TableRow.class)) + .via(instant -> generateRowFunc.apply(getIdFromInstant.apply(instant)))); + if (changeTableSchema) { rows = rows @@ -356,20 +395,22 @@ private void runStreamingPipelineWithSchemaChange( WriteResult result = rows.apply("Stream to BigQuery", write); if (useIgnoreUnknownValues) { // We ignore the extra fields, so no rows should have been sent to DLQ - PAssert.that(result.getFailedStorageApiInserts()).empty(); + PAssert.that("Check DLQ is empty", result.getFailedStorageApiInserts()).empty(); } else { // When we don't set ignoreUnknownValues, the rows with extra fields should be sent to DLQ. - PAssert.that(result.getFailedStorageApiInserts()) - .satisfies(new VerifyPCollectionSize(TOTAL_N - numOriginalRows, extraField)); + PAssert.that( + String.format("Check DLQ has %s schema errors", TOTAL_N - ORIGINAL_N), + result.getFailedStorageApiInserts()) + .satisfies(new VerifyPCollectionSize(TOTAL_N - ORIGINAL_N, extraField)); } p.run().waitUntilFinish(); // Check row completeness, non-duplication, and that schema update works as intended. - int expectedCount = useIgnoreUnknownValues ? TOTAL_N : numOriginalRows; + int expectedCount = useIgnoreUnknownValues ? TOTAL_N : ORIGINAL_N; boolean checkNoDuplication = (method == Write.Method.STORAGE_WRITE_API) ? true : false; checkRowCompleteness(tableSpec, expectedCount, checkNoDuplication); if (useIgnoreUnknownValues) { - checkRowsWithUpdatedSchema(tableSpec, extraField, numOriginalRows, useAutoSchemaUpdate); + checkRowsWithUpdatedSchema(tableSpec, extraField, useAutoSchemaUpdate); } } @@ -386,21 +427,29 @@ private static class VerifyPCollectionSize @Override public Void apply(Iterable input) { List itemList = new ArrayList<>(); + String expectedError = "SchemaTooNarrowException"; for (BigQueryStorageApiInsertError err : input) { itemList.add(err); // Check the error message is due to schema mismatch from the extra field. - assertTrue(err.getErrorMessage().contains("SchemaTooNarrowException")); - assertTrue(err.getErrorMessage().contains(extraField)); + assertTrue( + String.format( + "Didn't find expected [%s] error in failed message: %s", expectedError, err), + err.getErrorMessage().contains(expectedError)); + assertTrue( + String.format( + "Didn't find expected [%s] schema field in failed message: %s", expectedError, err), + err.getErrorMessage().contains(extraField)); } // Check we have the expected number of rows in DLQ. // Should be equal to number of rows with extra fields. + LOG.info("Found {} failed rows in DLQ", itemList.size()); assertEquals(expectedSize, itemList.size()); return null; } } // Check the expected number of rows reached the table. - // If appropriate (using STORAGE_WRITE_API), check no duplication happened. + // If using STORAGE_WRITE_API, check no duplication happened. private static void checkRowCompleteness( String tableSpec, int expectedCount, boolean checkNoDuplication) throws IOException, InterruptedException { @@ -426,7 +475,7 @@ private static void checkRowCompleteness( // Performs checks on the table's rows under different conditions. // Note: these should only be performed when ignoreUnknownValues is set. public void checkRowsWithUpdatedSchema( - String tableSpec, String extraField, int numOriginalRows, boolean useAutoSchemaUpdate) + String tableSpec, String extraField, boolean useAutoSchemaUpdate) throws IOException, InterruptedException { List actualRows = BQ_CLIENT.queryUnflattened( @@ -434,15 +483,19 @@ public void checkRowsWithUpdatedSchema( for (TableRow row : actualRows) { // Rows written to the table should not have the extra field if - // 1. The original row didn't have it in the first place + // 1. The row has original schema // 2. We didn't set autoSchemaUpdate (the extra field would just be dropped) // 3. We didn't change the table's schema (again, the extra field would be dropped) - if (Integer.parseInt((String) row.get("id")) < numOriginalRows + if (Integer.parseInt((String) row.get("id")) < ORIGINAL_N || !useAutoSchemaUpdate || !changeTableSchema) { - assertTrue(row.get(extraField) == null); + assertTrue( + String.format("Expected row to NOT have field %s:\n%s", extraField, row), + row.get(extraField) == null); } else { - assertTrue(row.get(extraField) != null); + assertTrue( + String.format("Expected row to have field %s:\n%s", extraField, row), + row.get(extraField) != null); } } }