Skip to content

Commit

Permalink
Add integration tests for Storage Write API schema update feature (#2…
Browse files Browse the repository at this point in the history
…7740)

* Add argument checks and tests for BQ StorageAPI sinks.

* Change some argument checking to log a warning message if the check fails.

* Fix the format after spotlessCheck and checkStyle.

* Parameterize tests with useWriteSchema option.

Internally, we will decide whether to call withSchema() with a schema
of shuffled fields based on this option.

* Update some warning messages.

* Fix a few typos on the method name STORAGE_WRITE_API
* Change the warning message when both numStorageWriteApiStreams and autoSharding are set. In this case, autoSharding takes priority.
* Add an argument check for using both numFileShards and autoSharding via FILE_LOADS.

* integration tests for auto schema update

* remove observing stream from tests

* fix test expected message

* add test for ignoreunknown, move exception tests to other file

* fixes

* cleanup unused variables; add with/without table schema change dimension

* forgot to remove a test

* update query

---------

Co-authored-by: Shunping Huang <[email protected]>
  • Loading branch information
ahmedabu98 and shunping authored Aug 8, 2023
1 parent 34cabc0 commit 7824f2c
Show file tree
Hide file tree
Showing 6 changed files with 602 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3128,32 +3128,80 @@ public WriteResult expand(PCollection<T> input) {
.collect(Collectors.toList())),
"No more than one of jsonSchema, schemaFromView, or dynamicDestinations may be set");

// Perform some argument checks
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
Write.Method method = resolveMethod(input);
if (input.isBounded() == IsBounded.UNBOUNDED
&& (method == Write.Method.FILE_LOADS || method == Write.Method.STORAGE_WRITE_API)) {
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
Duration triggeringFrequency =
(method == Write.Method.STORAGE_WRITE_API)
? getStorageApiTriggeringFrequency(bqOptions)
: getTriggeringFrequency();
checkArgument(
triggeringFrequency != null,
"When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, "
+ "triggering frequency must be specified");
} else {
checkArgument(
getTriggeringFrequency() == null && getNumFileShards() == 0,
"Triggering frequency or number of file shards can be specified only when writing an"
+ " unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, but: the collection"
+ " was %s and the method was %s",
input.isBounded(),
method);
if (input.isBounded() == IsBounded.UNBOUNDED) {
if (method == Write.Method.FILE_LOADS || method == Write.Method.STORAGE_WRITE_API) {
Duration triggeringFrequency =
(method == Write.Method.STORAGE_WRITE_API)
? getStorageApiTriggeringFrequency(bqOptions)
: getTriggeringFrequency();
checkArgument(
triggeringFrequency != null,
"When writing an unbounded PCollection via FILE_LOADS or STORAGE_WRITE_API, "
+ "triggering frequency must be specified");
} else {
checkArgument(
getTriggeringFrequency() == null,
"Triggering frequency can be specified only when writing via FILE_LOADS or STORAGE_WRITE_API, but the method was %s.",
method);
}
if (method != Method.FILE_LOADS) {
checkArgument(
getNumFileShards() == 0,
"Number of file shards can be specified only when writing via FILE_LOADS, but the method was %s.",
method);
}
if (method == Method.STORAGE_API_AT_LEAST_ONCE
&& getStorageApiTriggeringFrequency(bqOptions) != null) {
LOG.warn(
"Storage API triggering frequency option will be ignored is it can only be specified only "
+ "when writing via STORAGE_WRITE_API, but the method was {}.",
method);
}
if (getAutoSharding()) {
if (method == Method.STORAGE_WRITE_API && getStorageApiNumStreams(bqOptions) > 0) {
LOG.warn(
"Both numStorageWriteApiStreams and auto-sharding options are set. Will default to auto-sharding."
+ " To set a fixed number of streams, do not enable auto-sharding.");
} else if (method == Method.FILE_LOADS && getNumFileShards() > 0) {
LOG.warn(
"Both numFileShards and auto-sharding options are set. Will default to auto-sharding."
+ " To set a fixed number of file shards, do not enable auto-sharding.");
} else if (method == Method.STORAGE_API_AT_LEAST_ONCE) {
LOG.warn(
"The setting of auto-sharding is ignored. It is only supported when writing an"
+ " unbounded PCollection via FILE_LOADS, STREAMING_INSERTS or"
+ " STORAGE_WRITE_API, but the method was {}.",
method);
}
}
} else { // PCollection is bounded
String error =
String.format(
" is only applicable to an unbounded PCollection, but the input PCollection is %s.",
input.isBounded());
checkArgument(getTriggeringFrequency() == null, "Triggering frequency" + error);
checkArgument(!getAutoSharding(), "Auto-sharding" + error);
checkArgument(getNumFileShards() == 0, "Number of file shards" + error);

if (getStorageApiTriggeringFrequency(bqOptions) != null) {
LOG.warn("Storage API triggering frequency" + error);
}
if (getStorageApiNumStreams(bqOptions) != 0) {
LOG.warn("Setting the number of Storage API streams" + error);
}
}
if (method == Method.STORAGE_API_AT_LEAST_ONCE && getStorageApiNumStreams(bqOptions) != 0) {
LOG.warn(
"Setting a number of Storage API streams is only supported when using STORAGE_WRITE_API");
}

if (method != Method.STORAGE_WRITE_API && method != Method.STORAGE_API_AT_LEAST_ONCE) {
checkArgument(
!getAutoSchemaUpdate(),
"withAutoSchemaUpdate only supported when using storage-api writes.");
"withAutoSchemaUpdate only supported when using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE.");
}
if (getRowMutationInformationFn() != null) {
checkArgument(getMethod() == Method.STORAGE_API_AT_LEAST_ONCE);
Expand All @@ -3172,10 +3220,6 @@ public WriteResult expand(PCollection<T> input) {
!getUseBeamSchema(), "Auto schema update not supported when using Beam schemas.");
}

if (input.isBounded() == IsBounded.BOUNDED) {
checkArgument(!getAutoSharding(), "Auto-sharding is only applicable to unbounded input.");
}

if (getJsonTimePartitioning() != null) {
checkArgument(
getDynamicDestinations() == null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public Value next() {
if (autoUpdateSchema) {
try {
@Nullable TableRow unknownFields = payload.getUnknownFields();
if (unknownFields != null) {
if (unknownFields != null && !unknownFields.isEmpty()) {
// Protocol buffer serialization format supports concatenation. We serialize any new
// "known" fields
// into a proto and concatenate to the existing proto.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,16 @@ private static Result getUpdatedSchema(
return Result.of(newSchema, false);
}

// BigQuery column names are not case-sensitive, but Map keys are.
Map<String, TableFieldSchema> newSchemaMap =
newSchema.stream().collect(Collectors.toMap(TableFieldSchema::getName, x -> x));
newSchema.stream().collect(Collectors.toMap(tr -> tr.getName().toLowerCase(), x -> x));
Set<String> fieldNamesPopulated = Sets.newHashSet();
List<TableFieldSchema> updatedSchema = Lists.newArrayList();
boolean isEquivalent = oldSchema.size() == newSchema.size();
for (TableFieldSchema tableFieldSchema : oldSchema) {
@Nullable TableFieldSchema newTableFieldSchema = newSchemaMap.get(tableFieldSchema.getName());
@Nullable
TableFieldSchema newTableFieldSchema =
newSchemaMap.get(tableFieldSchema.getName().toLowerCase());
if (newTableFieldSchema == null) {
// We don't support deleting fields!
return Result.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ public List<TableRow> queryUnflattened(
String query, String projectId, boolean typed, boolean useStandardSql)
throws IOException, InterruptedException {
Random rnd = new Random(System.currentTimeMillis());
String temporaryDatasetId = "_dataflow_temporary_dataset_" + rnd.nextInt(1000000);
String temporaryDatasetId =
String.format("_dataflow_temporary_dataset_%s_%s", System.nanoTime(), rnd.nextInt(1000000));
String temporaryTableId = "dataflow_temporary_table_" + rnd.nextInt(1000000);
TableReference tempTableReference =
new TableReference()
Expand Down Expand Up @@ -570,4 +571,19 @@ public Table getTableResource(String projectId, String datasetId, String tableId
MAX_QUERY_RETRIES, tableId),
lastException);
}

public void updateTableSchema(
String projectId, String datasetId, String tableId, TableSchema newSchema)
throws IOException {
this.bqClient
.tables()
.patch(projectId, datasetId, tableId, new Table().setSchema(newSchema))
.execute();
LOG.info(
"Successfully updated the schema of table {}:{}.{}. New schema:\n{}",
projectId,
datasetId,
tableId,
newSchema.toPrettyString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,8 @@ public void testUpdateTableSchemaNoUnknownValues() throws Exception {
assumeTrue(useStreaming);
assumeTrue(useStorageApi);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"Auto schema update currently only supported when ignoreUnknownValues also set.");
p.apply("create", Create.empty(TableRowJsonCoder.of()))
.apply(
BigQueryIO.writeTableRows()
Expand All @@ -1866,6 +1868,28 @@ public void testUpdateTableSchemaNoUnknownValues() throws Exception {
p.run();
}

@Test
public void testStreamingWriteValidateFailsWithoutTriggeringFrequency() {
assumeTrue(useStreaming);
assumeTrue(!useStorageApiApproximate);
p.enableAbandonedNodeEnforcement(false);
Method method = useStorageApi ? Method.STORAGE_WRITE_API : Method.FILE_LOADS;

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("unbounded PCollection via FILE_LOADS or STORAGE_WRITE_API");
thrown.expectMessage("triggering frequency must be specified");

p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(null);
p.apply(Create.empty(INPUT_RECORD_CODER))
.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
.apply(
BigQueryIO.<InputRecord>write()
.withAvroFormatFunction(r -> new GenericData.Record(r.getSchema()))
.to("dataset.table")
.withMethod(method)
.withCreateDisposition(CreateDisposition.CREATE_NEVER));
}

@SuppressWarnings({"unused"})
static class UpdateTableSchemaDoFn extends DoFn<KV<String, TableRow>, TableRow> {
@TimerId("updateTimer")
Expand Down Expand Up @@ -2140,17 +2164,17 @@ public void testWriteValidateFailsWithAvroFormatAndStreamingInserts() {

@Test
public void testWriteValidateFailsWithBatchAutoSharding() {
assumeTrue(!useStorageApi);
assumeTrue(!useStreaming);
p.enableAbandonedNodeEnforcement(false);

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Auto-sharding is only applicable to unbounded input.");
thrown.expectMessage(
"Auto-sharding is only applicable to an unbounded PCollection, but the input PCollection is BOUNDED.");
p.apply(Create.empty(INPUT_RECORD_CODER))
.apply(
BigQueryIO.<InputRecord>write()
.to("dataset.table")
.withSchema(new TableSchema())
.withMethod(Method.STREAMING_INSERTS)
.withAutoSharding()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
}
Expand Down
Loading

0 comments on commit 7824f2c

Please sign in to comment.