Skip to content

Commit

Permalink
Use Periodic Impulse for BQ SchemaUpdate tests (apache#27998)
Browse files Browse the repository at this point in the history
* use periodic impulse for schema update tests; manually enable streaming engine; only run most relevant tests on dataflow runner

* enable test for dataflow runner

* spotless

* increase num rows

* limit parallelism on directrunner, make tests run faster when possible

* use project

* spotless

* limit stream parallelism

* wait longer when not using input schema
  • Loading branch information
ahmedabu98 authored and lostluck committed Aug 30, 2023
1 parent f4875f8 commit b4d30af
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 49 deletions.
2 changes: 0 additions & 2 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,6 @@ task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test, dependsOn: copyG
include '**/*IT.class'
exclude '**/BigQueryIOReadIT.class'
exclude '**/BigQueryIOStorageReadTableRowIT.class'
exclude '**/StorageApiSinkSchemaUpdateIT.class' // IT based on test stream
exclude '**/PubsubReadIT.class'
exclude '**/FhirIOReadIT.class'
exclude '**/gcp/spanner/changestreams/it/*.class'
Expand Down Expand Up @@ -606,7 +605,6 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) {

include '**/*IT.class'
exclude '**/BigQueryIOStorageReadTableRowIT.class'
exclude '**/StorageApiSinkSchemaUpdateIT.class' // IT based on test stream
exclude '**/SpannerWriteIT.class'
exclude '**/*KmsKeyIT.class'
exclude '**/FhirIOReadIT.class'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class PeriodicImpulse extends PTransform<PBegin, PCollection<Instant>> {
Instant stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
Duration fireInterval = Duration.standardMinutes(1);
boolean applyWindowing = false;
boolean catchUpToNow = true;

private PeriodicImpulse() {}

Expand Down Expand Up @@ -65,14 +66,24 @@ public PeriodicImpulse applyWindowing() {
return this;
}

/**
* The default behavior is that PeriodicImpulse emits all instants until Instant.now(), then
* starts firing at the specified interval. If this is set to false, the PeriodicImpulse will
* perform the interval wait before firing each instant.
*/
public PeriodicImpulse catchUpToNow(boolean catchUpToNow) {
this.catchUpToNow = catchUpToNow;
return this;
}

@Override
public PCollection<Instant> expand(PBegin input) {
PCollection<Instant> result =
input
.apply(
Create.<PeriodicSequence.SequenceDefinition>of(
new PeriodicSequence.SequenceDefinition(
startTimestamp, stopTimestamp, fireInterval)))
startTimestamp, stopTimestamp, fireInterval, catchUpToNow)))
.apply(PeriodicSequence.create());

if (this.applyWindowing) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,23 @@ public static class SequenceDefinition {
public Instant first;
public Instant last;
public Long durationMilliSec;
public boolean catchUpToNow;

public SequenceDefinition() {}

public SequenceDefinition(Instant first, Instant last, Duration duration) {
this.first = first;
this.last = last;
this.durationMilliSec = duration.getMillis();
this.catchUpToNow = true;
}

public SequenceDefinition(
Instant first, Instant last, Duration duration, boolean catchUpToNow) {
this.first = first;
this.last = last;
this.durationMilliSec = duration.getMillis();
this.catchUpToNow = catchUpToNow;
}

@Override
Expand Down Expand Up @@ -223,11 +233,17 @@ public ProcessContinuation processElement(
estimator.setWatermark(output);
nextOutput = nextOutput + interval;
}
if (!srcElement.catchUpToNow) {
break;
}
}

ProcessContinuation continuation = ProcessContinuation.stop();
if (claimSuccess) {
Duration offset = new Duration(Instant.now(), Instant.ofEpochMilli(nextOutput));
Duration offset =
srcElement.catchUpToNow
? new Duration(Instant.now(), Instant.ofEpochMilli(nextOutput))
: new Duration(interval);
continuation = ProcessContinuation.resume().withResumeDelay(offset);
}
return continuation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,44 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;

import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.beam.runners.direct.DirectOptions;
import java.util.function.Function;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -111,7 +116,14 @@ public static Iterable<Object[]> data() {
"TIMESTAMP"
};

private static final int TOTAL_N = 35;
// ************ NOTE ************
// The test may fail if Storage API Streams take longer than expected to recognize
// an updated schema. If that happens consistently, just increase these two numbers
// to give it more time.
// Total number of rows written to the sink
private static final int TOTAL_N = 60;
// Number of rows with the original schema
private static final int ORIGINAL_N = 50;

private final Random randomGenerator = new Random();

Expand Down Expand Up @@ -185,10 +197,9 @@ public void setup() {
public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState<Integer> counter)
throws Exception {
int current = firstNonNull(counter.read(), 0);
Thread.sleep(1000);
// We update schema early on to leave a healthy amount of time for StreamWriter to recognize
// it.
if (current == 2) {
// We update schema early on to leave a healthy amount of time for
// StreamWriter to recognize it.
if (current == 10) {
bqClient.updateTableSchema(
projectId,
datasetId,
Expand All @@ -203,17 +214,21 @@ public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState<In

static class GenerateRowFunc implements SerializableFunction<Long, TableRow> {
private final List<String> fieldNames;
private final List<String> fieldNamesWithExtra;

public GenerateRowFunc(List<String> fieldNames) {
public GenerateRowFunc(List<String> fieldNames, List<String> fieldNamesWithExtra) {
this.fieldNames = fieldNames;
this.fieldNamesWithExtra = fieldNamesWithExtra;
}

@Override
public TableRow apply(Long rowId) {
TableRow row = new TableRow();
row.set("id", rowId);

for (String name : fieldNames) {
List<String> fields = rowId < ORIGINAL_N ? fieldNames : fieldNamesWithExtra;

for (String name : fields) {
String type = Iterables.get(Splitter.on('_').split(name), 0);
switch (type) {
case "BOOL":
Expand Down Expand Up @@ -280,8 +295,21 @@ private void runStreamingPipelineWithSchemaChange(
Write.Method method, boolean useAutoSchemaUpdate, boolean useIgnoreUnknownValues)
throws Exception {
Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
// Set threshold bytes to 0 so that the stream attempts to fetch an updated schema after each
// append
p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(0);
p.getOptions().as(DirectOptions.class).setTargetParallelism(1);
// Limit parallelism so that all streams recognize the new schema in an expected short amount
// of time (before we start writing rows with updated schema)
p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(3);
// Need to manually enable streaming engine for legacy dataflow runner
ExperimentalOptions.addExperiment(
p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT);
// Only run the most relevant test case on Dataflow
if (p.getOptions().getRunner().getName().contains("DataflowRunner")) {
assumeTrue(
"Skipping in favor of more relevant test case",
changeTableSchema && useInputSchema && useAutoSchemaUpdate);
}

List<String> fieldNamesOrigin = new ArrayList<String>(Arrays.asList(FIELDS));

Expand All @@ -303,26 +331,6 @@ private void runStreamingPipelineWithSchemaChange(
String tableId = createTable(bqTableSchema);
String tableSpec = PROJECT + ":" + BIG_QUERY_DATASET_ID + "." + tableId;

TestStream.Builder<TableRow> testStream =
TestStream.create(TableRowJsonCoder.of()).advanceWatermarkTo(new Instant(0));

// Generate rows with original schema
int numOriginalRows = 30;
GenerateRowFunc originalSchemaFunc = new GenerateRowFunc(fieldNamesOrigin);
for (long i = 0; i < numOriginalRows; i++) {
testStream = testStream.addElements(originalSchemaFunc.apply(i));
testStream = testStream.advanceProcessingTime(Duration.standardSeconds(5));
}

// Generate rows with updated schema
// These rows should only reach the table if ignoreUnknownValues is set,
// and the extra field should be present only when autoSchemaUpdate is set
GenerateRowFunc updatedSchemaFunc = new GenerateRowFunc(fieldNamesWithExtra);
for (long i = numOriginalRows; i < TOTAL_N; i++) {
testStream = testStream.addElements(updatedSchemaFunc.apply(i));
testStream = testStream.advanceProcessingTime(Duration.standardSeconds(5));
}

// build write transform
Write<TableRow> write =
BigQueryIO.writeTableRows()
Expand All @@ -341,8 +349,39 @@ private void runStreamingPipelineWithSchemaChange(
write = write.ignoreUnknownValues();
}

// build pipeline
PCollection<TableRow> rows = p.apply("Generate rows", testStream.advanceWatermarkToInfinity());
// set up and build pipeline
Instant start = new Instant(0);
// We give a healthy waiting period between each element to give Storage API streams a chance to
// recognize the new schema. Apply on relevant tests.
boolean waitLonger = changeTableSchema && (useAutoSchemaUpdate || !useInputSchema);
Duration interval = waitLonger ? Duration.standardSeconds(1) : Duration.millis(1);
Duration stop =
waitLonger ? Duration.standardSeconds(TOTAL_N - 1) : Duration.millis(TOTAL_N - 1);
Function<Instant, Long> getIdFromInstant =
waitLonger
? (Function<Instant, Long> & Serializable)
(Instant instant) -> instant.getMillis() / 1000
: (Function<Instant, Long> & Serializable) (Instant instant) -> instant.getMillis();

// Generates rows with original schema up for row IDs under ORIGINAL_N
// Then generates rows with updated schema for the rest
// Rows with updated schema should only reach the table if ignoreUnknownValues is set,
// and the extra field should be present only when autoSchemaUpdate is set
GenerateRowFunc generateRowFunc = new GenerateRowFunc(fieldNamesOrigin, fieldNamesWithExtra);
PCollection<Instant> instants =
p.apply(
"Generate Instants",
PeriodicImpulse.create()
.startAt(start)
.stopAt(start.plus(stop))
.withInterval(interval)
.catchUpToNow(false));
PCollection<TableRow> rows =
instants.apply(
"Create TableRows",
MapElements.into(TypeDescriptor.of(TableRow.class))
.via(instant -> generateRowFunc.apply(getIdFromInstant.apply(instant))));

if (changeTableSchema) {
rows =
rows
Expand All @@ -356,20 +395,22 @@ private void runStreamingPipelineWithSchemaChange(
WriteResult result = rows.apply("Stream to BigQuery", write);
if (useIgnoreUnknownValues) {
// We ignore the extra fields, so no rows should have been sent to DLQ
PAssert.that(result.getFailedStorageApiInserts()).empty();
PAssert.that("Check DLQ is empty", result.getFailedStorageApiInserts()).empty();
} else {
// When we don't set ignoreUnknownValues, the rows with extra fields should be sent to DLQ.
PAssert.that(result.getFailedStorageApiInserts())
.satisfies(new VerifyPCollectionSize(TOTAL_N - numOriginalRows, extraField));
PAssert.that(
String.format("Check DLQ has %s schema errors", TOTAL_N - ORIGINAL_N),
result.getFailedStorageApiInserts())
.satisfies(new VerifyPCollectionSize(TOTAL_N - ORIGINAL_N, extraField));
}
p.run().waitUntilFinish();

// Check row completeness, non-duplication, and that schema update works as intended.
int expectedCount = useIgnoreUnknownValues ? TOTAL_N : numOriginalRows;
int expectedCount = useIgnoreUnknownValues ? TOTAL_N : ORIGINAL_N;
boolean checkNoDuplication = (method == Write.Method.STORAGE_WRITE_API) ? true : false;
checkRowCompleteness(tableSpec, expectedCount, checkNoDuplication);
if (useIgnoreUnknownValues) {
checkRowsWithUpdatedSchema(tableSpec, extraField, numOriginalRows, useAutoSchemaUpdate);
checkRowsWithUpdatedSchema(tableSpec, extraField, useAutoSchemaUpdate);
}
}

Expand All @@ -386,21 +427,29 @@ private static class VerifyPCollectionSize
@Override
public Void apply(Iterable<BigQueryStorageApiInsertError> input) {
List<BigQueryStorageApiInsertError> itemList = new ArrayList<>();
String expectedError = "SchemaTooNarrowException";
for (BigQueryStorageApiInsertError err : input) {
itemList.add(err);
// Check the error message is due to schema mismatch from the extra field.
assertTrue(err.getErrorMessage().contains("SchemaTooNarrowException"));
assertTrue(err.getErrorMessage().contains(extraField));
assertTrue(
String.format(
"Didn't find expected [%s] error in failed message: %s", expectedError, err),
err.getErrorMessage().contains(expectedError));
assertTrue(
String.format(
"Didn't find expected [%s] schema field in failed message: %s", expectedError, err),
err.getErrorMessage().contains(extraField));
}
// Check we have the expected number of rows in DLQ.
// Should be equal to number of rows with extra fields.
LOG.info("Found {} failed rows in DLQ", itemList.size());
assertEquals(expectedSize, itemList.size());
return null;
}
}

// Check the expected number of rows reached the table.
// If appropriate (using STORAGE_WRITE_API), check no duplication happened.
// If using STORAGE_WRITE_API, check no duplication happened.
private static void checkRowCompleteness(
String tableSpec, int expectedCount, boolean checkNoDuplication)
throws IOException, InterruptedException {
Expand All @@ -426,23 +475,27 @@ private static void checkRowCompleteness(
// Performs checks on the table's rows under different conditions.
// Note: these should only be performed when ignoreUnknownValues is set.
public void checkRowsWithUpdatedSchema(
String tableSpec, String extraField, int numOriginalRows, boolean useAutoSchemaUpdate)
String tableSpec, String extraField, boolean useAutoSchemaUpdate)
throws IOException, InterruptedException {
List<TableRow> actualRows =
BQ_CLIENT.queryUnflattened(
String.format("SELECT * FROM [%s]", tableSpec), PROJECT, true, false);

for (TableRow row : actualRows) {
// Rows written to the table should not have the extra field if
// 1. The original row didn't have it in the first place
// 1. The row has original schema
// 2. We didn't set autoSchemaUpdate (the extra field would just be dropped)
// 3. We didn't change the table's schema (again, the extra field would be dropped)
if (Integer.parseInt((String) row.get("id")) < numOriginalRows
if (Integer.parseInt((String) row.get("id")) < ORIGINAL_N
|| !useAutoSchemaUpdate
|| !changeTableSchema) {
assertTrue(row.get(extraField) == null);
assertTrue(
String.format("Expected row to NOT have field %s:\n%s", extraField, row),
row.get(extraField) == null);
} else {
assertTrue(row.get(extraField) != null);
assertTrue(
String.format("Expected row to have field %s:\n%s", extraField, row),
row.get(extraField) != null);
}
}
}
Expand Down

0 comments on commit b4d30af

Please sign in to comment.