Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add argument checks and tests for BQ StorageAPI sinks. #27213

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3138,16 +3138,44 @@ public WriteResult expand(PCollection<T> input) {
: getTriggeringFrequency();
checkArgument(
triggeringFrequency != null,
"When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, "
"When writing an unbounded PCollection via FILE_LOADS or STORAGE_WRITE_API, "
+ "triggering frequency must be specified");
} else {
checkArgument(
getTriggeringFrequency() == null && getNumFileShards() == 0,
"Triggering frequency or number of file shards can be specified only when writing an"
+ " unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, but: the collection"
+ " was %s and the method was %s",
getTriggeringFrequency() == null,
"Triggering frequency can be specified only when writing an unbounded PCollection via"
+ " FILE_LOADS or STORAGE_WRITE_API, but the collection was %s and the method was"
+ " %s.",
input.isBounded(),
method);

if (method == Method.STORAGE_WRITE_API) {
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
if (getStorageApiTriggeringFrequency(bqOptions) != null) {
LOG.warn(
"The setting of storageApiTriggeringFrequency in BigQueryOptions is ignored."
+ " It is only supported when writing an unbounded PCollection via"
+ " STORAGE_WRITE_API, but the collection was {} and the method was {}.",
Comment on lines +3156 to +3158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could be more concise; using STORAGE_WRITE_API isn't the problem here

input.isBounded(),
method);
}
}

checkArgument(
(getNumFileShards() == 0),
"Number of file shards can be specified only when writing an unbounded PCollection via"
+ " FILE_LOADS, but the collection was %s and the method was %s",
input.isBounded(),
method);

if (getNumStorageWriteApiStreams() != 0) {
LOG.warn(
"The setting of numStorageWriteApiStreams is ignored. It can be specified only"
+ " when writing an unbounded PCollection via STORAGE_WRITE_API, but the collection"
+ " was {} and the method was {}.",
input.isBounded(),
method);
}
}

if (method != Method.STORAGE_WRITE_API && method != Method.STORAGE_API_AT_LEAST_ONCE) {
Expand All @@ -3174,6 +3202,27 @@ public WriteResult expand(PCollection<T> input) {

if (input.isBounded() == IsBounded.BOUNDED) {
checkArgument(!getAutoSharding(), "Auto-sharding is only applicable to unbounded input.");
} else {
if (method == Method.STORAGE_WRITE_API) {
if (getNumStorageWriteApiStreams() > 0 && getAutoSharding()) {
LOG.warn(
"The setting of numStorageWriteApiStreams is ignored. It is only supported when"
+ " autoSharding is not enabled.");
}
} else if (method == Method.FILE_LOADS) {
if (getNumFileShards() > 0 && getAutoSharding()) {
LOG.warn(
"The setting of numFileShards is ignored. It is only supported when autoSharding is"
+ " not enabled.");
}
} else if (method != Method.STREAMING_INSERTS) {
LOG.warn(
"The setting of auto-sharding is ignored. It is only supported when writing an"
+ " unbounded PCollection via FILE_LOADS, STREAMING_INSERTS or"
+ " STORAGE_WRITE_API, but the collection was {} and the method was {}.",
input.isBounded(),
method);
}
Comment on lines +3218 to +3225
Copy link
Contributor

@ahmedabu98 ahmedabu98 Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at this point it makes more sense to check method == STORAGE_API_AT_LEAST_ONCE? and have a warning message specific for that method. also needs a check that autosharding is enabled here

}

if (getJsonTimePartitioning() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,4 +570,17 @@ public Table getTableResource(String projectId, String datasetId, String tableId
MAX_QUERY_RETRIES, tableId),
lastException);
}

public void updateTableSchema(
String projectId, String datasetId, String tableId, TableSchema newSchema) {
try {
this.bqClient
.tables()
.patch(projectId, datasetId, tableId, new Table().setSchema(newSchema))
.execute();
LOG.info("Successfully updated the schema of table: " + tableId);
} catch (Exception e) {
LOG.debug("Exceptions caught when updating table schema: " + e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity should maybe be raised to info/warning for visibility?

}
}
}
Loading