Skip to content

Commit

Permalink
Handle schema updates in Storage API writes.
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax committed Jan 19, 2023
1 parent fafeaa9 commit 7ad44c8
Show file tree
Hide file tree
Showing 18 changed files with 958 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.auto.value.AutoValue;
import com.google.auto.value.extension.memoized.Memoized;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
Expand All @@ -28,35 +35,111 @@
* StorageApiWritesShardedRecords} to enapsulate a destination {@link TableSchema} along with a
* {@link BigQueryServices.StreamAppendClient} and other objects needed to write records.
*/
class AppendClientInfo {
@Nullable BigQueryServices.StreamAppendClient streamAppendClient;
@Nullable TableSchema tableSchema;
Consumer<BigQueryServices.StreamAppendClient> closeAppendClient;
Descriptors.Descriptor descriptor;
@AutoValue
abstract class AppendClientInfo {
abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient();

public AppendClientInfo(
abstract TableSchema getTableSchema();

abstract Consumer<BigQueryServices.StreamAppendClient> getCloseAppendClient();

abstract com.google.api.services.bigquery.model.TableSchema getJsonTableSchema();

abstract TableRowToStorageApiProto.SchemaInformation getSchemaInformation();

abstract Descriptors.Descriptor getDescriptor();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setStreamAppendClient(@Nullable BigQueryServices.StreamAppendClient value);

abstract Builder setTableSchema(TableSchema value);

abstract Builder setCloseAppendClient(Consumer<BigQueryServices.StreamAppendClient> value);

abstract Builder setJsonTableSchema(com.google.api.services.bigquery.model.TableSchema value);

abstract Builder setSchemaInformation(TableRowToStorageApiProto.SchemaInformation value);

abstract Builder setDescriptor(Descriptors.Descriptor value);

abstract AppendClientInfo build();
};

abstract Builder toBuilder();

static AppendClientInfo of(
TableSchema tableSchema, Consumer<BigQueryServices.StreamAppendClient> closeAppendClient)
throws Exception {
this.tableSchema = tableSchema;
this.descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true);
this.closeAppendClient = closeAppendClient;
return new AutoValue_AppendClientInfo.Builder()
.setTableSchema(tableSchema)
.setCloseAppendClient(closeAppendClient)
.setJsonTableSchema(TableRowToStorageApiProto.protoSchemaToTableSchema(tableSchema))
.setSchemaInformation(
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema))
.setDescriptor(TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true))
.build();
}

public AppendClientInfo createAppendClient(
public AppendClientInfo withNoAppendClient() {
return toBuilder().setStreamAppendClient(null).build();
}

public AppendClientInfo withAppendClient(
BigQueryServices.DatasetService datasetService,
Supplier<String> getStreamName,
boolean useConnectionPool)
throws Exception {
if (streamAppendClient == null) {
this.streamAppendClient =
datasetService.getStreamAppendClient(getStreamName.get(), descriptor, useConnectionPool);
if (getStreamAppendClient() != null) {
return this;
} else {
return toBuilder()
.setStreamAppendClient(
datasetService.getStreamAppendClient(
getStreamName.get(), getDescriptor(), useConnectionPool))
.build();
}
return this;
}

public void close() {
if (streamAppendClient != null) {
closeAppendClient.accept(streamAppendClient);
BigQueryServices.StreamAppendClient client = getStreamAppendClient();
if (client != null) {
getCloseAppendClient().accept(client);
}
}

boolean hasSchemaChanged(TableSchema updatedTableSchema) {
return updatedTableSchema.hashCode() != getTableSchema().hashCode();
}

public ByteString encodeUnknownFields(TableRow unknown, boolean ignoreUnknownValues)
throws TableRowToStorageApiProto.SchemaConversionException {
Message msg =
TableRowToStorageApiProto.messageFromTableRow(
getSchemaInformation(),
getDescriptorIgnoreRequired(),
unknown,
ignoreUnknownValues,
true,
null);
return msg.toByteString();
}

@Memoized
Descriptors.Descriptor getDescriptorIgnoreRequired() {
try {
return TableRowToStorageApiProto.getDescriptorFromTableSchema(getTableSchema(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public TableRow toTableRow(ByteString protoBytes) {
try {
return TableRowToStorageApiProto.tableRowFromMessage(
DynamicMessage.parseFrom(getDescriptor(), protoBytes));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2878,6 +2878,15 @@ public WriteResult expand(PCollection<T> input) {
"withAutoSchemaUpdate only supported when using storage-api writes.");
}

if (getAutoSchemaUpdate()) {
// TODO(reuvenlax): Remove this restriction once we implement support.
checkArgument(
getIgnoreUnknownValues(),
"Auto schema update currently only supported when ignoreUnknownValues also set.");
checkArgument(
!getUseBeamSchema(), "Auto schema update not supported when using Beam schemas.");
}

if (method != Write.Method.FILE_LOADS) {
// we only support writing avro for FILE_LOADS
checkArgument(
Expand Down Expand Up @@ -3172,11 +3181,12 @@ private <DestinationT> WriteResult continueExpandTyped(
dynamicDestinations,
tableRowWriterFactory.getToRowFn(),
getCreateDisposition(),
getIgnoreUnknownValues());
getIgnoreUnknownValues(),
getAutoSchemaUpdate());
}

StorageApiLoads<DestinationT, T> storageApiLoads =
new StorageApiLoads<DestinationT, T>(
new StorageApiLoads<>(
destinationCoder,
storageApiDynamicDestinations,
getCreateDisposition(),
Expand All @@ -3185,7 +3195,9 @@ private <DestinationT> WriteResult continueExpandTyped(
getBigQueryServices(),
getStorageApiNumStreams(bqOptions),
method == Method.STORAGE_API_AT_LEAST_ONCE,
getAutoSharding());
getAutoSharding(),
getAutoSchemaUpdate(),
getIgnoreUnknownValues());
return input.apply("StorageApiLoads", storageApiLoads);
} else {
throw new RuntimeException("Unexpected write method " + method);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.Descriptors.Descriptor;
import java.io.IOException;
Expand Down Expand Up @@ -204,6 +205,9 @@ Table patchTableDescription(TableReference tableReference, @Nullable String tabl
WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
throws IOException, InterruptedException;

@Nullable
WriteStream getWriteStream(String writeStream);

/**
* Create an append client for a given Storage API write stream. The stream must be created
* first.
Expand All @@ -230,6 +234,10 @@ interface StreamAppendClient extends AutoCloseable {
/** Append rows to a Storage API write stream at the given offset. */
ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows) throws Exception;

/** If the table schema has been updated, returns the new schema. Otherwise returns null. */
@Nullable
TableSchema getUpdatedSchema();

/**
* If the previous call to appendRows blocked due to flow control, returns how long the call
* blocked for.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
Expand Down Expand Up @@ -1309,6 +1310,11 @@ public WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
.build());
}

@Override
public @Nullable WriteStream getWriteStream(String writeStream) {
return newWriteClient.getWriteStream(writeStream);
}

@Override
public StreamAppendClient getStreamAppendClient(
String streamName, Descriptor descriptor, boolean useConnectionPool) throws Exception {
Expand Down Expand Up @@ -1378,6 +1384,11 @@ public ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows)
return streamWriter.append(rows, offset);
}

@Override
public TableSchema getUpdatedSchema() {
return streamWriter.getUpdatedSchema();
}

@Override
public long getInflightWaitSeconds() {
return streamWriter.getInflightWaitSeconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,9 @@
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.value.AutoValue;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Descriptors.FieldDescriptor.Type;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
Expand Down Expand Up @@ -74,7 +70,6 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -1036,28 +1031,4 @@ private static Object convertAvroNumeric(Object value) {
public static ServiceCallMetric writeCallMetric(TableReference tableReference) {
return callMetricForMethod(tableReference, "BigQueryBatchWrite");
}

/**
* Hashes a schema descriptor using a deterministic hash function.
*
* <p>Warning! These hashes are encoded into messages, so changing this function will cause
* pipelines to get stuck on update!
*/
public static long hashSchemaDescriptorDeterministic(Descriptor descriptor) {
long hashCode = 0;
for (FieldDescriptor fieldDescriptor : descriptor.getFields()) {
hashCode +=
Hashing.murmur3_32()
.hashString(fieldDescriptor.getName(), StandardCharsets.UTF_8)
.asInt();
hashCode += Hashing.murmur3_32().hashInt(fieldDescriptor.isRepeated() ? 1 : 0).asInt();
hashCode += Hashing.murmur3_32().hashInt(fieldDescriptor.isRequired() ? 1 : 0).asInt();
Type type = fieldDescriptor.getType();
hashCode += Hashing.murmur3_32().hashInt(type.ordinal()).asInt();
if (type.equals(Type.MESSAGE)) {
hashCode += hashSchemaDescriptorDeterministic(fieldDescriptor.getMessageType());
}
}
return hashCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,50 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.protobuf.ByteString;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nullable;

/**
* Takes in an iterable and batches the results into multiple ProtoRows objects. The splitSize
* parameter controls how many rows are batched into a single ProtoRows object before we move on to
* the next one.
*/
class SplittingIterable implements Iterable<ProtoRows> {
interface ConvertUnknownFields {
ByteString convert(TableRow tableRow, boolean ignoreUnknownValues)
throws TableRowToStorageApiProto.SchemaConversionException;
}

private final Iterable<StorageApiWritePayload> underlying;
private final long splitSize;

public SplittingIterable(Iterable<StorageApiWritePayload> underlying, long splitSize) {
private final ConvertUnknownFields unknownFieldsToMessage;
private final Function<ByteString, TableRow> protoToTableRow;
private final BiConsumer<TableRow, String> failedRowsConsumer;
private final boolean autoUpdateSchema;
private final boolean ignoreUnknownValues;

public SplittingIterable(
Iterable<StorageApiWritePayload> underlying,
long splitSize,
ConvertUnknownFields unknownFieldsToMessage,
Function<ByteString, TableRow> protoToTableRow,
BiConsumer<TableRow, String> failedRowsConsumer,
boolean autoUpdateSchema,
boolean ignoreUnknownValues) {
this.underlying = underlying;
this.splitSize = splitSize;
this.unknownFieldsToMessage = unknownFieldsToMessage;
this.protoToTableRow = protoToTableRow;
this.failedRowsConsumer = failedRowsConsumer;
this.autoUpdateSchema = autoUpdateSchema;
this.ignoreUnknownValues = ignoreUnknownValues;
}

@Override
Expand All @@ -57,7 +84,37 @@ public ProtoRows next() {
while (underlyingIterator.hasNext()) {
StorageApiWritePayload payload = underlyingIterator.next();
ByteString byteString = ByteString.copyFrom(payload.getPayload());

if (autoUpdateSchema) {
try {
@Nullable TableRow unknownFields = payload.getUnknownFields();
if (unknownFields != null) {
// Protocol buffer serialization format supports concatenation. We serialize any new
// "known" fields
// into a proto and concatenate to the existing proto.
try {
byteString =
byteString.concat(
unknownFieldsToMessage.convert(unknownFields, ignoreUnknownValues));
} catch (TableRowToStorageApiProto.SchemaConversionException e) {
// This generally implies that ignoreUnknownValues=false and there were still
// unknown values here.
// Reconstitute the TableRow and send it to the failed-rows consumer.
TableRow tableRow = protoToTableRow.apply(byteString);
// TODO(24926, reuvenlax): We need to merge the unknown fields in! Currently we
// only execute this
// codepath when ignoreUnknownFields==true, so we should never hit this codepath.
// However once
// 24926 is fixed, we need to merge the unknownFields back into the main row
// before outputting to the
// failed-rows consumer.
failedRowsConsumer.accept(tableRow, e.toString());
continue;
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
inserts.addSerializedRows(byteString);
bytesSize += byteString.size();
if (bytesSize > splitSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public interface MessageConverter<T> {

StorageApiWritePayload toMessage(T element) throws Exception;

StorageApiWritePayload toMessage(TableRow tableRow, boolean respectRequired) throws Exception;

TableRow toTableRow(T element);
}

Expand Down
Loading

0 comments on commit 7ad44c8

Please sign in to comment.