Skip to content

Commit

Permalink
[BigQueryIO] fetch updated schema for newly created Storage API strea…
Browse files Browse the repository at this point in the history
…m writers (#33231)

* add dynamic dest test

* fix and add some tests

* add to changes.md

* fix whitespace

* trigger postcommits

* address comments
  • Loading branch information
ahmedabu98 authored Jan 8, 2025
1 parent 40151ab commit b4c3a4f
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java_DataflowV2.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231))

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
throws IOException, InterruptedException;

@Nullable
WriteStream getWriteStream(String writeStream);
TableSchema getWriteStreamSchema(String writeStream);

/**
* Create an append client for a given Storage API write stream. The stream must be created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
Expand All @@ -86,6 +87,7 @@
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.WriteStreamView;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import com.google.protobuf.DescriptorProtos;
Expand Down Expand Up @@ -1418,8 +1420,15 @@ public WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
}

@Override
public @Nullable WriteStream getWriteStream(String writeStream) {
return newWriteClient.getWriteStream(writeStream);
public @Nullable TableSchema getWriteStreamSchema(String writeStream) {
@Nullable
WriteStream stream =
newWriteClient.getWriteStream(
GetWriteStreamRequest.newBuilder()
.setView(WriteStreamView.FULL)
.setName(writeStream)
.build());
return (stream != null && stream.hasTableSchema()) ? stream.getTableSchema() : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos;
Expand Down Expand Up @@ -475,15 +474,18 @@ SchemaAndDescriptor getCurrentTableSchema(String stream, @Nullable TableSchema u
() -> {
if (autoUpdateSchema) {
@Nullable
WriteStream writeStream =
TableSchema streamSchema =
Preconditions.checkStateNotNull(maybeWriteStreamService)
.getWriteStream(streamName);
if (writeStream != null && writeStream.hasTableSchema()) {
TableSchema updatedFromStream = writeStream.getTableSchema();
currentSchema.set(updatedFromStream);
updated.set(true);
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableUrn, updatedFromStream);
.getWriteStreamSchema(streamName);
if (streamSchema != null) {
Optional<TableSchema> newSchema =
TableSchemaUpdateUtils.getUpdatedSchema(initialTableSchema, streamSchema);
if (newSchema.isPresent()) {
currentSchema.set(newSchema.get());
updated.set(true);
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableUrn, newSchema.get());
}
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,28 @@ public void process(
element.getKey().getKey(), dynamicDestinations, datasetService);
tableSchema = converter.getTableSchema();
descriptor = converter.getDescriptor(false);

if (autoUpdateSchema) {
// A StreamWriter ignores table schema updates that happen prior to its creation.
// So before creating a StreamWriter below, we fetch the table schema to check if we
// missed an update.
// If so, use the new schema instead of the base schema
@Nullable
TableSchema streamSchema =
MoreObjects.firstNonNull(
writeStreamService.getWriteStreamSchema(getOrCreateStream.get()),
TableSchema.getDefaultInstance());
Optional<TableSchema> newSchema =
TableSchemaUpdateUtils.getUpdatedSchema(tableSchema, streamSchema);

if (newSchema.isPresent()) {
tableSchema = newSchema.get();
descriptor =
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
tableSchema, true, false);
updatedSchema.write(tableSchema);
}
}
}
AppendClientInfo info =
AppendClientInfo.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,11 +590,11 @@ public WriteStream createWriteStream(String tableUrn, Type type) throws Interrup

@Override
@Nullable
public WriteStream getWriteStream(String streamName) {
public com.google.cloud.bigquery.storage.v1.TableSchema getWriteStreamSchema(String streamName) {
synchronized (FakeDatasetService.class) {
@Nullable Stream stream = writeStreams.get(streamName);
if (stream != null) {
return stream.toWriteStream();
return stream.toWriteStream().getTableSchema();
}
}
// TODO(relax): Return the exact error that BigQuery returns.
Expand Down
Loading

0 comments on commit b4c3a4f

Please sign in to comment.