diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java index cb9575b4ff43..010083f5569e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -17,8 +17,15 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import com.google.api.services.bigquery.model.TableRow; +import com.google.auto.value.AutoValue; +import com.google.auto.value.extension.memoized.Memoized; import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; import java.util.function.Consumer; import java.util.function.Supplier; import javax.annotation.Nullable; @@ -28,35 +35,111 @@ * StorageApiWritesShardedRecords} to enapsulate a destination {@link TableSchema} along with a * {@link BigQueryServices.StreamAppendClient} and other objects needed to write records. */ -class AppendClientInfo { - @Nullable BigQueryServices.StreamAppendClient streamAppendClient; - @Nullable TableSchema tableSchema; - Consumer closeAppendClient; - Descriptors.Descriptor descriptor; +@AutoValue +abstract class AppendClientInfo { + abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient(); - public AppendClientInfo( + abstract TableSchema getTableSchema(); + + abstract Consumer getCloseAppendClient(); + + abstract com.google.api.services.bigquery.model.TableSchema getJsonTableSchema(); + + abstract TableRowToStorageApiProto.SchemaInformation getSchemaInformation(); + + abstract Descriptors.Descriptor getDescriptor(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setStreamAppendClient(@Nullable BigQueryServices.StreamAppendClient value); + + abstract Builder setTableSchema(TableSchema value); + + abstract Builder setCloseAppendClient(Consumer value); + + abstract Builder setJsonTableSchema(com.google.api.services.bigquery.model.TableSchema value); + + abstract Builder setSchemaInformation(TableRowToStorageApiProto.SchemaInformation value); + + abstract Builder setDescriptor(Descriptors.Descriptor value); + + abstract AppendClientInfo build(); + }; + + abstract Builder toBuilder(); + + static AppendClientInfo of( TableSchema tableSchema, Consumer closeAppendClient) throws Exception { - this.tableSchema = tableSchema; - this.descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true); - this.closeAppendClient = closeAppendClient; + return new AutoValue_AppendClientInfo.Builder() + .setTableSchema(tableSchema) + .setCloseAppendClient(closeAppendClient) + .setJsonTableSchema(TableRowToStorageApiProto.protoSchemaToTableSchema(tableSchema)) + .setSchemaInformation( + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema)) + .setDescriptor(TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true)) + .build(); } - public AppendClientInfo createAppendClient( + public AppendClientInfo withNoAppendClient() { + return toBuilder().setStreamAppendClient(null).build(); + } + + public AppendClientInfo withAppendClient( BigQueryServices.DatasetService datasetService, Supplier getStreamName, boolean useConnectionPool) throws Exception { - if (streamAppendClient == null) { - this.streamAppendClient = - datasetService.getStreamAppendClient(getStreamName.get(), descriptor, useConnectionPool); + if (getStreamAppendClient() != null) { + return this; + } else { + return toBuilder() + .setStreamAppendClient( + datasetService.getStreamAppendClient( + getStreamName.get(), getDescriptor(), useConnectionPool)) + .build(); } - return this; } public void close() { - if (streamAppendClient != null) { - closeAppendClient.accept(streamAppendClient); + BigQueryServices.StreamAppendClient client = getStreamAppendClient(); + if (client != null) { + getCloseAppendClient().accept(client); + } + } + + boolean hasSchemaChanged(TableSchema updatedTableSchema) { + return updatedTableSchema.hashCode() != getTableSchema().hashCode(); + } + + public ByteString encodeUnknownFields(TableRow unknown, boolean ignoreUnknownValues) + throws TableRowToStorageApiProto.SchemaConversionException { + Message msg = + TableRowToStorageApiProto.messageFromTableRow( + getSchemaInformation(), + getDescriptorIgnoreRequired(), + unknown, + ignoreUnknownValues, + true, + null); + return msg.toByteString(); + } + + @Memoized + Descriptors.Descriptor getDescriptorIgnoreRequired() { + try { + return TableRowToStorageApiProto.getDescriptorFromTableSchema(getTableSchema(), false); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public TableRow toTableRow(ByteString protoBytes) { + try { + return TableRowToStorageApiProto.tableRowFromMessage( + DynamicMessage.parseFrom(getDescriptor(), protoBytes)); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index fedc898fe973..5e032d96962a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2878,6 +2878,15 @@ public WriteResult expand(PCollection input) { "withAutoSchemaUpdate only supported when using storage-api writes."); } + if (getAutoSchemaUpdate()) { + // TODO(reuvenlax): Remove this restriction once we implement support. + checkArgument( + getIgnoreUnknownValues(), + "Auto schema update currently only supported when ignoreUnknownValues also set."); + checkArgument( + !getUseBeamSchema(), "Auto schema update not supported when using Beam schemas."); + } + if (method != Write.Method.FILE_LOADS) { // we only support writing avro for FILE_LOADS checkArgument( @@ -3172,11 +3181,12 @@ private WriteResult continueExpandTyped( dynamicDestinations, tableRowWriterFactory.getToRowFn(), getCreateDisposition(), - getIgnoreUnknownValues()); + getIgnoreUnknownValues(), + getAutoSchemaUpdate()); } StorageApiLoads storageApiLoads = - new StorageApiLoads( + new StorageApiLoads<>( destinationCoder, storageApiDynamicDestinations, getCreateDisposition(), @@ -3185,7 +3195,9 @@ private WriteResult continueExpandTyped( getBigQueryServices(), getStorageApiNumStreams(bqOptions), method == Method.STORAGE_API_AT_LEAST_ONCE, - getAutoSharding()); + getAutoSharding(), + getAutoSchemaUpdate(), + getIgnoreUnknownValues()); return input.apply("StorageApiLoads", storageApiLoads); } else { throw new RuntimeException("Unexpected write method " + method); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index adf02ed31a6b..6f178bd61504 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -41,6 +41,7 @@ import com.google.cloud.bigquery.storage.v1.ReadSession; import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest; import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse; +import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.protobuf.Descriptors.Descriptor; import java.io.IOException; @@ -204,6 +205,9 @@ Table patchTableDescription(TableReference tableReference, @Nullable String tabl WriteStream createWriteStream(String tableUrn, WriteStream.Type type) throws IOException, InterruptedException; + @Nullable + WriteStream getWriteStream(String writeStream); + /** * Create an append client for a given Storage API write stream. The stream must be created * first. @@ -230,6 +234,10 @@ interface StreamAppendClient extends AutoCloseable { /** Append rows to a Storage API write stream at the given offset. */ ApiFuture appendRows(long offset, ProtoRows rows) throws Exception; + /** If the table schema has been updated, returns the new schema. Otherwise returns null. */ + @Nullable + TableSchema getUpdatedSchema(); + /** * If the previous call to appendRows blocked due to flow control, returns how long the call * blocked for. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 289087c1d469..6365bad2569b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -82,6 +82,7 @@ import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest; import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse; import com.google.cloud.bigquery.storage.v1.StreamWriter; +import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; @@ -1309,6 +1310,11 @@ public WriteStream createWriteStream(String tableUrn, WriteStream.Type type) .build()); } + @Override + public @Nullable WriteStream getWriteStream(String writeStream) { + return newWriteClient.getWriteStream(writeStream); + } + @Override public StreamAppendClient getStreamAppendClient( String streamName, Descriptor descriptor, boolean useConnectionPool) throws Exception { @@ -1378,6 +1384,11 @@ public ApiFuture appendRows(long offset, ProtoRows rows) return streamWriter.append(rows, offset); } + @Override + public TableSchema getUpdatedSchema() { + return streamWriter.getUpdatedSchema(); + } + @Override public long getInflightWaitSeconds() { return streamWriter.getInflightWaitSeconds(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 773b3af9673c..f0caa958df94 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -28,13 +28,9 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.auto.value.AutoValue; -import com.google.protobuf.Descriptors.Descriptor; -import com.google.protobuf.Descriptors.FieldDescriptor; -import com.google.protobuf.Descriptors.FieldDescriptor.Type; import java.io.Serializable; import java.math.BigDecimal; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; @@ -74,7 +70,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; @@ -1036,28 +1031,4 @@ private static Object convertAvroNumeric(Object value) { public static ServiceCallMetric writeCallMetric(TableReference tableReference) { return callMetricForMethod(tableReference, "BigQueryBatchWrite"); } - - /** - * Hashes a schema descriptor using a deterministic hash function. - * - *

Warning! These hashes are encoded into messages, so changing this function will cause - * pipelines to get stuck on update! - */ - public static long hashSchemaDescriptorDeterministic(Descriptor descriptor) { - long hashCode = 0; - for (FieldDescriptor fieldDescriptor : descriptor.getFields()) { - hashCode += - Hashing.murmur3_32() - .hashString(fieldDescriptor.getName(), StandardCharsets.UTF_8) - .asInt(); - hashCode += Hashing.murmur3_32().hashInt(fieldDescriptor.isRepeated() ? 1 : 0).asInt(); - hashCode += Hashing.murmur3_32().hashInt(fieldDescriptor.isRequired() ? 1 : 0).asInt(); - Type type = fieldDescriptor.getType(); - hashCode += Hashing.murmur3_32().hashInt(type.ordinal()).asInt(); - if (type.equals(Type.MESSAGE)) { - hashCode += hashSchemaDescriptorDeterministic(fieldDescriptor.getMessageType()); - } - } - return hashCode; - } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java index 03b009797b54..4b4978bb30fe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java @@ -17,10 +17,14 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.protobuf.ByteString; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.function.BiConsumer; +import java.util.function.Function; +import javax.annotation.Nullable; /** * Takes in an iterable and batches the results into multiple ProtoRows objects. The splitSize @@ -28,12 +32,35 @@ * the next one. */ class SplittingIterable implements Iterable { + interface ConvertUnknownFields { + ByteString convert(TableRow tableRow, boolean ignoreUnknownValues) + throws TableRowToStorageApiProto.SchemaConversionException; + } + private final Iterable underlying; private final long splitSize; - public SplittingIterable(Iterable underlying, long splitSize) { + private final ConvertUnknownFields unknownFieldsToMessage; + private final Function protoToTableRow; + private final BiConsumer failedRowsConsumer; + private final boolean autoUpdateSchema; + private final boolean ignoreUnknownValues; + + public SplittingIterable( + Iterable underlying, + long splitSize, + ConvertUnknownFields unknownFieldsToMessage, + Function protoToTableRow, + BiConsumer failedRowsConsumer, + boolean autoUpdateSchema, + boolean ignoreUnknownValues) { this.underlying = underlying; this.splitSize = splitSize; + this.unknownFieldsToMessage = unknownFieldsToMessage; + this.protoToTableRow = protoToTableRow; + this.failedRowsConsumer = failedRowsConsumer; + this.autoUpdateSchema = autoUpdateSchema; + this.ignoreUnknownValues = ignoreUnknownValues; } @Override @@ -57,7 +84,37 @@ public ProtoRows next() { while (underlyingIterator.hasNext()) { StorageApiWritePayload payload = underlyingIterator.next(); ByteString byteString = ByteString.copyFrom(payload.getPayload()); - + if (autoUpdateSchema) { + try { + @Nullable TableRow unknownFields = payload.getUnknownFields(); + if (unknownFields != null) { + // Protocol buffer serialization format supports concatenation. We serialize any new + // "known" fields + // into a proto and concatenate to the existing proto. + try { + byteString = + byteString.concat( + unknownFieldsToMessage.convert(unknownFields, ignoreUnknownValues)); + } catch (TableRowToStorageApiProto.SchemaConversionException e) { + // This generally implies that ignoreUnknownValues=false and there were still + // unknown values here. + // Reconstitute the TableRow and send it to the failed-rows consumer. + TableRow tableRow = protoToTableRow.apply(byteString); + // TODO(24926, reuvenlax): We need to merge the unknown fields in! Currently we + // only execute this + // codepath when ignoreUnknownFields==true, so we should never hit this codepath. + // However once + // 24926 is fixed, we need to merge the unknownFields back into the main row + // before outputting to the + // failed-rows consumer. + failedRowsConsumer.accept(tableRow, e.toString()); + continue; + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } inserts.addSerializedRows(byteString); bytesSize += byteString.size(); if (bytesSize > splitSize) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java index c3076e8af863..966384da632a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java @@ -35,6 +35,8 @@ public interface MessageConverter { StorageApiWritePayload toMessage(T element) throws Exception; + StorageApiWritePayload toMessage(TableRow tableRow, boolean respectRequired) throws Exception; + TableRow toTableRow(T element); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java index 4280d356bd2a..6e9f75d15dfe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java @@ -61,9 +61,15 @@ public TableSchema getTableSchema() { } @Override + @SuppressWarnings("nullness") public StorageApiWritePayload toMessage(T element) { Message msg = BeamRowToStorageApiProto.messageFromBeamRow(descriptor, toRow.apply(element)); - return new AutoValue_StorageApiWritePayload(msg.toByteArray()); + return new AutoValue_StorageApiWritePayload(msg.toByteArray(), null); + } + + @Override + public StorageApiWritePayload toMessage(TableRow tableRow, boolean respectRequired) { + throw new RuntimeException("Not supported"); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java index 6797bd20e682..61c930a00bb6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java @@ -37,6 +37,7 @@ public class StorageApiDynamicDestinationsTableRow formatFunction; private final CreateDisposition createDisposition; private final boolean ignoreUnknownValues; + private final boolean autoSchemaUpdates; private static final TableSchemaCache SCHEMA_CACHE = new TableSchemaCache(Duration.standardSeconds(1)); @@ -48,11 +49,13 @@ public class StorageApiDynamicDestinationsTableRow inner, SerializableFunction formatFunction, CreateDisposition createDisposition, - boolean ignoreUnknownValues) { + boolean ignoreUnknownValues, + boolean autoSchemaUpdates) { super(inner); this.formatFunction = formatFunction; this.createDisposition = createDisposition; this.ignoreUnknownValues = ignoreUnknownValues; + this.autoSchemaUpdates = autoSchemaUpdates; } static void clearSchemaCache() throws ExecutionException, InterruptedException { @@ -115,9 +118,11 @@ class TableRowConverter implements MessageConverter { this.protoTableSchema = TableRowToStorageApiProto.schemaToProtoTableSchema(tableSchema); schemaInformation = TableRowToStorageApiProto.SchemaInformation.fromTableSchema(protoTableSchema); + // If autoSchemaUpdates == true, then generate a descriptor where all the fields are optional. + // This allows us to support field relaxation downstream. descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema( - Preconditions.checkStateNotNull(tableSchema), true); + Preconditions.checkStateNotNull(tableSchema), !autoSchemaUpdates); } @Override @@ -135,12 +140,23 @@ public StorageApiWritePayload toMessage(T element) throws Exception { return toMessage(formatFunction.apply(element), true); } + @Override public StorageApiWritePayload toMessage(TableRow tableRow, boolean respectRequired) throws Exception { + // If autoSchemaUpdates==true, then we allow unknown values at this step and insert them into + // the unknownFields variable. This allows us to handle schema updates in the write stage. + boolean ignoreUnknown = ignoreUnknownValues || autoSchemaUpdates; + @Nullable TableRow unknownFields = autoSchemaUpdates ? new TableRow() : null; + boolean allowMissingFields = autoSchemaUpdates; Message msg = TableRowToStorageApiProto.messageFromTableRow( - schemaInformation, descriptor, tableRow, ignoreUnknownValues); - return StorageApiWritePayload.of(msg.toByteArray()); + schemaInformation, + descriptor, + tableRow, + ignoreUnknown, + allowMissingFields, + unknownFields); + return StorageApiWritePayload.of(msg.toByteArray(), unknownFields); } }; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java index da2f695f7087..a8d133c0c3a9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java @@ -54,6 +54,8 @@ public class StorageApiLoads private final int numShards; private final boolean allowInconsistentWrites; private final boolean allowAutosharding; + private final boolean autoUpdateSchema; + private final boolean ignoreUnknownValues; public StorageApiLoads( Coder destinationCoder, @@ -64,7 +66,9 @@ public StorageApiLoads( BigQueryServices bqServices, int numShards, boolean allowInconsistentWrites, - boolean allowAutosharding) { + boolean allowAutosharding, + boolean autoUpdateSchema, + boolean ignoreUnknownValues) { this.destinationCoder = destinationCoder; this.dynamicDestinations = dynamicDestinations; this.createDisposition = createDisposition; @@ -74,6 +78,8 @@ public StorageApiLoads( this.numShards = numShards; this.allowInconsistentWrites = allowInconsistentWrites; this.allowAutosharding = allowAutosharding; + this.autoUpdateSchema = autoUpdateSchema; + this.ignoreUnknownValues = ignoreUnknownValues; } @Override @@ -126,7 +132,9 @@ public WriteResult expandInconsistent( dynamicDestinations, bqServices, failedRowsTag, - BigQueryStorageApiInsertErrorCoder.of())); + BigQueryStorageApiInsertErrorCoder.of(), + autoUpdateSchema, + ignoreUnknownValues)); PCollection insertErrors = PCollectionList.of(convertMessagesResult.get(failedRowsTag)) @@ -200,7 +208,9 @@ public WriteResult expandTriggered( bqServices, destinationCoder, BigQueryStorageApiInsertErrorCoder.of(), - failedRowsTag)); + failedRowsTag, + autoUpdateSchema, + ignoreUnknownValues)); PCollection insertErrors = PCollectionList.of(convertMessagesResult.get(failedRowsTag)) @@ -271,7 +281,9 @@ public WriteResult expandUntriggered( dynamicDestinations, bqServices, failedRowsTag, - BigQueryStorageApiInsertErrorCoder.of())); + BigQueryStorageApiInsertErrorCoder.of(), + autoUpdateSchema, + ignoreUnknownValues)); PCollection insertErrors = PCollectionList.of(convertMessagesResult.get(failedRowsTag)) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java index 00a34b9c14f7..85a0c3b4fe61 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java @@ -17,10 +17,14 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import com.google.api.services.bigquery.model.TableRow; import com.google.auto.value.AutoValue; +import com.google.auto.value.extension.memoized.Memoized; import java.io.IOException; +import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.util.CoderUtils; /** Class used to wrap elements being sent to the Storage API sinks. */ @AutoValue @@ -29,7 +33,24 @@ public abstract class StorageApiWritePayload { @SuppressWarnings("mutable") public abstract byte[] getPayload(); - static StorageApiWritePayload of(byte[] payload) throws IOException { - return new AutoValue_StorageApiWritePayload(payload); + @SuppressWarnings("mutable") + public abstract @Nullable byte[] getUnknownFieldsPayload(); + + @SuppressWarnings("nullness") + static StorageApiWritePayload of(byte[] payload, @Nullable TableRow unknownFields) + throws IOException { + @Nullable byte[] unknownFieldsPayload = null; + if (unknownFields != null) { + unknownFieldsPayload = CoderUtils.encodeToByteArray(TableRowJsonCoder.of(), unknownFields); + } + return new AutoValue_StorageApiWritePayload(payload, unknownFieldsPayload); + } + + public @Memoized @Nullable TableRow getUnknownFields() throws IOException { + @Nullable byte[] fields = getUnknownFieldsPayload(); + if (fields == null) { + return null; + } + return CoderUtils.decodeFromByteArray(TableRowJsonCoder.of(), fields); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java index 190525925aec..343b7e1c81a7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java @@ -40,16 +40,22 @@ public class StorageApiWriteRecordsInconsistent private final TupleTag failedRowsTag; private final TupleTag> finalizeTag = new TupleTag<>("finalizeTag"); private final Coder failedRowsCoder; + private final boolean autoUpdateSchema; + private final boolean ignoreUnknownValues; public StorageApiWriteRecordsInconsistent( StorageApiDynamicDestinations dynamicDestinations, BigQueryServices bqServices, TupleTag failedRowsTag, - Coder failedRowsCoder) { + Coder failedRowsCoder, + boolean autoUpdateSchema, + boolean ignoreUnknownValues) { this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; this.failedRowsTag = failedRowsTag; this.failedRowsCoder = failedRowsCoder; + this.autoUpdateSchema = autoUpdateSchema; + this.ignoreUnknownValues = ignoreUnknownValues; } @Override @@ -70,7 +76,9 @@ public PCollectionTuple expand(PCollection private final TupleTag failedRowsTag; private final TupleTag> finalizeTag = new TupleTag<>("finalizeTag"); private final Coder failedRowsCoder; + private final boolean autoUpdateSchema; + private final boolean ignoreUnknownValues; private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool(); /** @@ -138,11 +141,15 @@ public StorageApiWriteUnshardedRecords( StorageApiDynamicDestinations dynamicDestinations, BigQueryServices bqServices, TupleTag failedRowsTag, - Coder failedRowsCoder) { + Coder failedRowsCoder, + boolean autoUpdateSchema, + boolean ignoreUnknownValues) { this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; this.failedRowsTag = failedRowsTag; this.failedRowsCoder = failedRowsCoder; + this.autoUpdateSchema = autoUpdateSchema; + this.ignoreUnknownValues = ignoreUnknownValues; } @Override @@ -165,7 +172,9 @@ public PCollectionTuple expand(PCollection private final Counter forcedFlushes = Metrics.counter(WriteRecordsDoFn.class, "forcedFlushes"); private final TupleTag> finalizeTag; private final TupleTag failedRowsTag; + private final boolean autoUpdateSchema; + private final boolean ignoreUnknownValues; static class AppendRowsContext extends RetryManager.Operation.Context { long offset; @@ -216,7 +227,7 @@ class DestinationState { "rowsSentToFailedRowsCollection"); private final boolean useDefaultStream; - private TableSchema tableSchema; + private TableSchema initialTableSchema; private Instant nextCacheTickle = Instant.MAX; private final int clientNumber; private final boolean usingMultiplexing; @@ -235,7 +246,7 @@ public DestinationState( this.pendingMessages = Lists.newArrayList(); this.maybeDatasetService = datasetService; this.useDefaultStream = useDefaultStream; - this.tableSchema = messageConverter.getTableSchema(); + this.initialTableSchema = messageConverter.getTableSchema(); this.clientNumber = new Random().nextInt(streamAppendClientCount); this.usingMultiplexing = usingMultiplexing; this.maxRequestSize = maxRequestSize; @@ -244,8 +255,9 @@ public DestinationState( void teardown() { maybeTickleCache(); if (appendClientInfo != null) { - if (appendClientInfo.streamAppendClient != null) { - runAsyncIgnoreFailure(closeWriterExecutor, appendClientInfo.streamAppendClient::unpin); + StreamAppendClient client = appendClientInfo.getStreamAppendClient(); + if (client != null) { + runAsyncIgnoreFailure(closeWriterExecutor, client::unpin); } appendClientInfo = null; } @@ -278,10 +290,11 @@ String getOrCreateStreamName() { return this.streamName; } - AppendClientInfo generateClient() throws Exception { - Preconditions.checkStateNotNull(maybeDatasetService); + AppendClientInfo generateClient(@Nullable TableSchema updatedSchema) throws Exception { + TableSchema tableSchema = + (updatedSchema != null) ? updatedSchema : getCurrentTableSchema(streamName); AppendClientInfo appendClientInfo = - new AppendClientInfo( + AppendClientInfo.of( tableSchema, // Make sure that the client is always closed in a different thread to avoid // blocking. @@ -289,21 +302,37 @@ AppendClientInfo generateClient() throws Exception { runAsyncIgnoreFailure( closeWriterExecutor, () -> { - // Remove the pin owned by the cache. synchronized (APPEND_CLIENTS) { + // Remove the pin owned by the cache. client.unpin(); client.close(); } })); appendClientInfo = - appendClientInfo.createAppendClient( - maybeDatasetService, () -> streamName, usingMultiplexing); + appendClientInfo.withAppendClient( + Preconditions.checkStateNotNull(maybeDatasetService), + () -> streamName, + usingMultiplexing); // This pin is "owned" by the cache. - Preconditions.checkStateNotNull(appendClientInfo.streamAppendClient).pin(); + Preconditions.checkStateNotNull(appendClientInfo.getStreamAppendClient()).pin(); return appendClientInfo; } - AppendClientInfo getAppendClientInfo(boolean lookupCache) { + TableSchema getCurrentTableSchema(String stream) { + TableSchema currentSchema = initialTableSchema; + if (autoUpdateSchema) { + @Nullable + WriteStream writeStream = + Preconditions.checkStateNotNull(maybeDatasetService).getWriteStream(streamName); + if (writeStream != null && writeStream.hasTableSchema()) { + currentSchema = writeStream.getTableSchema(); + } + } + return currentSchema; + } + + AppendClientInfo getAppendClientInfo( + boolean lookupCache, final @Nullable TableSchema updatedSchema) { try { if (this.appendClientInfo == null) { getOrCreateStreamName(); @@ -311,20 +340,21 @@ AppendClientInfo getAppendClientInfo(boolean lookupCache) { synchronized (APPEND_CLIENTS) { if (lookupCache) { newAppendClientInfo = - APPEND_CLIENTS.get(getStreamAppendClientCacheEntryKey(), this::generateClient); + APPEND_CLIENTS.get( + getStreamAppendClientCacheEntryKey(), () -> generateClient(updatedSchema)); } else { - newAppendClientInfo = generateClient(); + newAppendClientInfo = generateClient(updatedSchema); // override the clients in the cache. APPEND_CLIENTS.put(getStreamAppendClientCacheEntryKey(), newAppendClientInfo); } // This pin is "owned" by the current DoFn. - Preconditions.checkStateNotNull(newAppendClientInfo.streamAppendClient).pin(); + Preconditions.checkStateNotNull(newAppendClientInfo.getStreamAppendClient()).pin(); } this.currentOffset = 0; nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1)); this.appendClientInfo = newAppendClientInfo; } - return Preconditions.checkStateNotNull(this.appendClientInfo); + return Preconditions.checkStateNotNull(appendClientInfo); } catch (Exception e) { throw new RuntimeException(e); } @@ -343,9 +373,9 @@ void invalidateWriteStream() { if (appendClientInfo != null) { synchronized (APPEND_CLIENTS) { // Unpin in a different thread, as it may execute a blocking close. - if (appendClientInfo.streamAppendClient != null) { - runAsyncIgnoreFailure( - closeWriterExecutor, appendClientInfo.streamAppendClient::unpin); + StreamAppendClient client = appendClientInfo.getStreamAppendClient(); + if (client != null) { + runAsyncIgnoreFailure(closeWriterExecutor, client::unpin); } // The default stream is cached across multiple different DoFns. If they all try and // invalidate, then we can get races between threads invalidating and recreating @@ -366,9 +396,37 @@ void invalidateWriteStream() { } } - void addMessage(StorageApiWritePayload payload) throws Exception { + void addMessage( + StorageApiWritePayload payload, + OutputReceiver failedRowsReceiver) + throws Exception { maybeTickleCache(); ByteString payloadBytes = ByteString.copyFrom(payload.getPayload()); + if (autoUpdateSchema) { + if (appendClientInfo == null) { + appendClientInfo = getAppendClientInfo(true, null); + } + @Nullable TableRow unknownFields = payload.getUnknownFields(); + if (unknownFields != null) { + try { + payloadBytes = + payloadBytes.concat( + Preconditions.checkStateNotNull(appendClientInfo) + .encodeUnknownFields(unknownFields, ignoreUnknownValues)); + } catch (TableRowToStorageApiProto.SchemaConversionException e) { + TableRow tableRow = appendClientInfo.toTableRow(payloadBytes); + // TODO(24926, reuvenlax): We need to merge the unknown fields in! Currently we only + // execute this + // codepath when ignoreUnknownFields==true, so we should never hit this codepath. + // However once + // 24926 is fixed, we need to merge the unknownFields back into the main row before + // outputting to the + // failed-rows consumer. + failedRowsReceiver.output(new BigQueryStorageApiInsertError(tableRow, e.toString())); + return; + } + } + } pendingMessages.add(payloadBytes); } @@ -400,7 +458,8 @@ long flush( for (ByteString rowBytes : inserts.getSerializedRowsList()) { TableRow failedRow = TableRowToStorageApiProto.tableRowFromMessage( - DynamicMessage.parseFrom(getAppendClientInfo(true).descriptor, rowBytes)); + DynamicMessage.parseFrom( + getAppendClientInfo(true, null).getDescriptor(), rowBytes)); failedRowsReceiver.output( new BigQueryStorageApiInsertError( failedRow, "Row payload too large. Maximum size " + maxRequestSize)); @@ -424,7 +483,8 @@ long flush( } try { StreamAppendClient writeStream = - Preconditions.checkStateNotNull(getAppendClientInfo(true).streamAppendClient); + Preconditions.checkStateNotNull( + getAppendClientInfo(true, null).getStreamAppendClient()); ApiFuture response = writeStream.appendRows(c.offset, c.protoRows); inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds()); @@ -456,7 +516,7 @@ long flush( TableRow failedRow = TableRowToStorageApiProto.tableRowFromMessage( DynamicMessage.parseFrom( - Preconditions.checkStateNotNull(appendClientInfo).descriptor, + Preconditions.checkStateNotNull(appendClientInfo).getDescriptor(), protoBytes)); failedRowsReceiver.output( new BigQueryStorageApiInsertError( @@ -498,9 +558,7 @@ long flush( appendFailures.inc(); return RetryType.RETRY_ALL_OPERATIONS; }, - c -> { - recordsAppended.inc(c.protoRows.getSerializedRowsCount()); - }, + c -> recordsAppended.inc(c.protoRows.getSerializedRowsCount()), appendRowsContext); maybeTickleCache(); return inserts.getSerializedRowsCount(); @@ -519,6 +577,22 @@ String retrieveErrorDetails(Iterable failedContext) { .collect(Collectors.joining("\n"))) .collect(Collectors.joining("\n")); } + + void postFlush() { + // If we got a response indicating an updated schema, recreate the client. + if (this.appendClientInfo != null) { + @Nullable + StreamAppendClient streamAppendClient = appendClientInfo.getStreamAppendClient(); + @Nullable + TableSchema updatedTableSchema = + (streamAppendClient != null) ? streamAppendClient.getUpdatedSchema() : null; + if (updatedTableSchema != null) { + invalidateWriteStream(); + appendClientInfo = + Preconditions.checkStateNotNull(getAppendClientInfo(false, updatedTableSchema)); + } + } + } } private @Nullable Map destinations = Maps.newHashMap(); @@ -542,7 +616,9 @@ String retrieveErrorDetails(Iterable failedContext) { int flushThresholdCount, int streamAppendClientCount, TupleTag> finalizeTag, - TupleTag failedRowsTag) { + TupleTag failedRowsTag, + boolean autoUpdateSchema, + boolean ignoreUnknownValues) { this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; @@ -552,6 +628,8 @@ String retrieveErrorDetails(Iterable failedContext) { this.streamAppendClientCount = streamAppendClientCount; this.finalizeTag = finalizeTag; this.failedRowsTag = failedRowsTag; + this.autoUpdateSchema = autoUpdateSchema; + this.ignoreUnknownValues = ignoreUnknownValues; } boolean shouldFlush() { @@ -591,6 +669,10 @@ void flushAll(OutputReceiver failedRowsReceiver) retryManager.await(); } } + for (DestinationState destinationState : + Preconditions.checkStateNotNull(destinations).values()) { + destinationState.postFlush(); + } numPendingRecords = 0; numPendingRecordBytes = 0; } @@ -654,8 +736,10 @@ public void process( k -> createDestinationState( c, k, initializedDatasetService, pipelineOptions.as(BigQueryOptions.class))); - flushIfNecessary(o.get(failedRowsTag)); - state.addMessage(element.getValue()); + + OutputReceiver failedRowsReceiver = o.get(failedRowsTag); + flushIfNecessary(failedRowsReceiver); + state.addMessage(element.getValue(), failedRowsReceiver); ++numPendingRecords; numPendingRecordBytes += element.getValue().getPayload().length; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 6be3e491cd04..55786673017e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -29,8 +29,6 @@ import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.cloud.bigquery.storage.v1.WriteStream.Type; import com.google.protobuf.ByteString; -import com.google.protobuf.DynamicMessage; -import com.google.protobuf.InvalidProtocolBufferException; import io.grpc.Status; import io.grpc.Status.Code; import java.io.IOException; @@ -49,6 +47,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StreamAppendClient; @@ -119,6 +118,8 @@ public class StorageApiWritesShardedRecords destinationCoder; private final Coder failedRowsCoder; + private final boolean autoUpdateSchema; + private final boolean ignoreUnknownValues; private final Duration streamIdleTime = DEFAULT_STREAM_IDLE_TIME; private final TupleTag failedRowsTag; private final TupleTag> flushTag = new TupleTag<>("flushTag"); @@ -193,7 +194,9 @@ public StorageApiWritesShardedRecords( BigQueryServices bqServices, Coder destinationCoder, Coder failedRowsCoder, - TupleTag failedRowsTag) { + TupleTag failedRowsTag, + boolean autoUpdateSchema, + boolean ignoreUnknownValues) { this.dynamicDestinations = dynamicDestinations; this.createDisposition = createDisposition; this.kmsKey = kmsKey; @@ -201,6 +204,8 @@ public StorageApiWritesShardedRecords( this.destinationCoder = destinationCoder; this.failedRowsCoder = failedRowsCoder; this.failedRowsTag = failedRowsTag; + this.autoUpdateSchema = autoUpdateSchema; + this.ignoreUnknownValues = ignoreUnknownValues; } @Override @@ -288,6 +293,10 @@ class WriteRecordsDoFn @StateId("streamOffset") private final StateSpec> streamOffsetSpec = StateSpecs.value(); + @StateId("updatedSchema") + private final StateSpec> updatedSchema = + StateSpecs.value(ProtoCoder.of(TableSchema.class)); + @TimerId("idleTimer") private final TimerSpec idleTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); @@ -361,11 +370,16 @@ public void process( @Element KV, Iterable> element, final @AlwaysFetched @StateId("streamName") ValueState streamName, final @AlwaysFetched @StateId("streamOffset") ValueState streamOffset, + final @StateId("updatedSchema") ValueState updatedSchema, @TimerId("idleTimer") Timer idleTimer, final MultiOutputReceiver o) throws Exception { BigQueryOptions bigQueryOptions = pipelineOptions.as(BigQueryOptions.class); + if (autoUpdateSchema) { + updatedSchema.readLater(); + } + dynamicDestinations.setSideInputAccessorFromProcessContext(c); TableDestination tableDestination = destinations.computeIfAbsent( @@ -385,43 +399,68 @@ public void process( Supplier getOrCreateStream = () -> getOrCreateStream(tableId, streamName, streamOffset, idleTimer, datasetService); - Function getAppendClientInfo = - createAppendClient -> { - try { - @Nullable - TableSchema tableSchema = - messageConverters - .get(element.getKey().getKey(), dynamicDestinations, datasetService) - .getTableSchema(); - AppendClientInfo info = - new AppendClientInfo( - tableSchema, - // Make sure that the client is always closed in a different thread to avoid - // blocking. - client -> - runAsyncIgnoreFailure( - closeWriterExecutor, - () -> { - // Remove the pin that is "owned" by the cache. - client.unpin(); - client.close(); - })); - if (createAppendClient) { - info = info.createAppendClient(datasetService, getOrCreateStream, false); - // This pin is "owned" by the cache. - Preconditions.checkStateNotNull(info.streamAppendClient).pin(); - } - return info; - } catch (Exception e) { - throw new RuntimeException(e); - } - }; - AtomicReference appendClientInfo = new AtomicReference<>( - APPEND_CLIENTS.get(element.getKey(), () -> getAppendClientInfo.apply(true))); + APPEND_CLIENTS.get( + element.getKey(), + () -> { + @Nullable TableSchema tableSchema; + if (autoUpdateSchema && updatedSchema.read() != null) { + // We've seen an updated schema, so we use that. + tableSchema = updatedSchema.read(); + } else { + // Start off with the base schema. As we get notified of schema updates, we + // will update the + // descriptor. + tableSchema = + messageConverters + .get(element.getKey().getKey(), dynamicDestinations, datasetService) + .getTableSchema(); + } + AppendClientInfo info = + AppendClientInfo.of( + Preconditions.checkStateNotNull(tableSchema), + // Make sure that the client is always closed in a different thread + // to + // avoid blocking. + client -> + runAsyncIgnoreFailure( + closeWriterExecutor, + () -> { + // Remove the pin that is "owned" by the cache. + client.unpin(); + client.close(); + })) + .withAppendClient(datasetService, getOrCreateStream, false); + // This pin is "owned" by the cache. + Preconditions.checkStateNotNull(info.getStreamAppendClient()).pin(); + return info; + })); + TableSchema updatedSchemaValue = updatedSchema.read(); + if (autoUpdateSchema && updatedSchemaValue != null) { + if (appendClientInfo.get().hasSchemaChanged(updatedSchemaValue)) { + appendClientInfo.set( + AppendClientInfo.of( + updatedSchemaValue, appendClientInfo.get().getCloseAppendClient())); + APPEND_CLIENTS.invalidate(element.getKey()); + APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get()); + } + } - Iterable messages = new SplittingIterable(element.getValue(), splitSize); + // Each ProtoRows object contains at most 1MB of rows. + // TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely if + // already proto or already schema. + Iterable messages = + new SplittingIterable( + element.getValue(), + splitSize, + (fields, ignore) -> appendClientInfo.get().encodeUnknownFields(fields, ignore), + bytes -> appendClientInfo.get().toTableRow(bytes), + (failedRow, errorMessage) -> + o.get(failedRowsTag) + .output(new BigQueryStorageApiInsertError(failedRow, errorMessage)), + autoUpdateSchema, + ignoreUnknownValues); // Initialize stream names and offsets for all contexts. This will be called initially, but // will also be called if we roll over to a new stream on a retry. @@ -432,15 +471,19 @@ public void process( // Clear the stream name, forcing a new one to be created. streamName.write(""); } - appendClientInfo.get().createAppendClient(datasetService, getOrCreateStream, false); + appendClientInfo.set( + appendClientInfo + .get() + .withAppendClient(datasetService, getOrCreateStream, false)); StreamAppendClient streamAppendClient = - Preconditions.checkArgumentNotNull(appendClientInfo.get().streamAppendClient); + Preconditions.checkArgumentNotNull( + appendClientInfo.get().getStreamAppendClient()); String streamNameRead = Preconditions.checkArgumentNotNull(streamName.read()); long currentOffset = Preconditions.checkArgumentNotNull(streamOffset.read()); for (AppendRowsContext context : contexts) { context.streamName = streamNameRead; streamAppendClient.pin(); - context.client = appendClientInfo.get().streamAppendClient; + context.client = appendClientInfo.get().getStreamAppendClient(); context.offset = currentOffset; ++context.tryIteration; currentOffset = context.offset + context.protoRows.getSerializedRowsCount(); @@ -454,9 +497,8 @@ public void process( Consumer> clearClients = contexts -> { APPEND_CLIENTS.invalidate(element.getKey()); - appendClientInfo.set(getAppendClientInfo.apply(false)); + appendClientInfo.set(appendClientInfo.get().withNoAppendClient()); APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get()); - for (AppendRowsContext context : contexts) { if (context.client != null) { // Unpin in a different thread, as it may execute a blocking close. @@ -474,8 +516,11 @@ public void process( return ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build()); } try { - appendClientInfo.get().createAppendClient(datasetService, getOrCreateStream, false); - return Preconditions.checkStateNotNull(appendClientInfo.get().streamAppendClient) + appendClientInfo.set( + appendClientInfo + .get() + .withAppendClient(datasetService, getOrCreateStream, false)); + return Preconditions.checkStateNotNull(appendClientInfo.get().getStreamAppendClient()) .appendRows(context.offset, context.protoRows); } catch (Exception e) { throw new RuntimeException(e); @@ -502,19 +547,11 @@ public void process( for (int failedIndex : failedRowIndices) { // Convert the message to a TableRow and send it to the failedRows collection. ByteString protoBytes = failedContext.protoRows.getSerializedRows(failedIndex); - try { - TableRow failedRow = - TableRowToStorageApiProto.tableRowFromMessage( - DynamicMessage.parseFrom(appendClientInfo.get().descriptor, protoBytes)); - new BigQueryStorageApiInsertError( - failedRow, error.getRowIndexToErrorMessage().get(failedIndex)); - o.get(failedRowsTag) - .output( - new BigQueryStorageApiInsertError( - failedRow, error.getRowIndexToErrorMessage().get(failedIndex))); - } catch (InvalidProtocolBufferException e) { - LOG.error("Failed to insert row and could not parse the result!"); - } + TableRow failedRow = appendClientInfo.get().toTableRow(protoBytes); + o.get(failedRowsTag) + .output( + new BigQueryStorageApiInsertError( + failedRow, error.getRowIndexToErrorMessage().get(failedIndex))); } rowsSentToFailedRowsCollection.inc(failedRowIndices.size()); @@ -591,6 +628,7 @@ public void process( Consumer onSuccess = context -> { + AppendRowsResponse response = Preconditions.checkStateNotNull(context.getResult()); o.get(flushTag) .output( KV.of( @@ -619,9 +657,7 @@ public void process( + ". This is unexpected. All rows in the request will be sent to the failed-rows PCollection."); } for (ByteString rowBytes : protoRows.getSerializedRowsList()) { - TableRow failedRow = - TableRowToStorageApiProto.tableRowFromMessage( - DynamicMessage.parseFrom(appendClientInfo.get().descriptor, rowBytes)); + TableRow failedRow = appendClientInfo.get().toTableRow(rowBytes); o.get(failedRowsTag) .output( new BigQueryStorageApiInsertError( @@ -652,6 +688,23 @@ public void process( } appendSplitDistribution.update(numAppends); + if (autoUpdateSchema) { + @Nullable + StreamAppendClient streamAppendClient = appendClientInfo.get().getStreamAppendClient(); + ; + @Nullable + TableSchema newSchema = + (streamAppendClient != null) ? streamAppendClient.getUpdatedSchema() : null; + // Update the table schema and clear the append client. + if (newSchema != null) { + appendClientInfo.set( + AppendClientInfo.of(newSchema, appendClientInfo.get().getCloseAppendClient())); + APPEND_CLIENTS.invalidate(element.getKey()); + APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get()); + updatedSchema.write(newSchema); + } + } + java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now()); appendLatencyDistribution.update(timeElapsed.toMillis()); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index 19559e750771..95a8027516d1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -19,6 +19,7 @@ import static java.util.stream.Collectors.toList; +import com.google.api.services.bigquery.model.TableCell; import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder; import com.google.cloud.bigquery.storage.v1.TableFieldSchema; @@ -53,6 +54,7 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.beam.sdk.util.Preconditions; @@ -410,13 +412,18 @@ public static DynamicMessage messageFromMap( SchemaInformation schemaInformation, Descriptor descriptor, AbstractMap map, - boolean ignoreUnknownValues) + boolean ignoreUnknownValues, + boolean allowMissingRequiredFields, + @Nullable TableRow unknownFields) throws SchemaConversionException { DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); for (final Map.Entry entry : map.entrySet()) { @Nullable FieldDescriptor fieldDescriptor = descriptor.findFieldByName(entry.getKey().toLowerCase()); if (fieldDescriptor == null) { + if (unknownFields != null) { + unknownFields.set(entry.getKey().toLowerCase(), entry.getValue()); + } if (ignoreUnknownValues) { continue; } else { @@ -430,10 +437,23 @@ public static DynamicMessage messageFromMap( SchemaInformation fieldSchemaInformation = schemaInformation.getSchemaForField(entry.getKey()); try { + Supplier<@Nullable TableRow> getNestedUnknown = + () -> + (unknownFields == null) + ? null + : (TableRow) + unknownFields.computeIfAbsent( + entry.getKey().toLowerCase(), k -> new TableRow()); + @Nullable Object value = messageValueFromFieldValue( - fieldSchemaInformation, fieldDescriptor, entry.getValue(), ignoreUnknownValues); + fieldSchemaInformation, + fieldDescriptor, + entry.getValue(), + ignoreUnknownValues, + allowMissingRequiredFields, + getNestedUnknown); if (value != null) { builder.setField(fieldDescriptor, value); } @@ -458,11 +478,14 @@ public static DynamicMessage messageFromMap( * Given a BigQuery TableRow, returns a protocol-buffer message that can be used to write data * using the BigQuery Storage API. */ + @SuppressWarnings("nullness") public static DynamicMessage messageFromTableRow( SchemaInformation schemaInformation, Descriptor descriptor, TableRow tableRow, - boolean ignoreUnknownValues) + boolean ignoreUnknownValues, + boolean allowMissingRequiredFields, + final @Nullable TableRow unknownFields) throws SchemaConversionException { @Nullable Object fValue = tableRow.get("f"); if (fValue instanceof List) { @@ -479,15 +502,41 @@ public static DynamicMessage messageFromTableRow( } } + if (unknownFields != null) { + List unknownValues = Lists.newArrayListWithExpectedSize(cells.size()); + for (int i = 0; i < cells.size(); ++i) { + unknownValues.add(new TableCell().setV(null)); + } + unknownFields.setF(unknownValues); + } + for (int i = 0; i < cellsToProcess; ++i) { AbstractMap cell = cells.get(i); FieldDescriptor fieldDescriptor = descriptor.getFields().get(i); SchemaInformation fieldSchemaInformation = schemaInformation.getSchemaForField(i); try { + final int finalIndex = i; + Supplier<@Nullable TableRow> getNestedUnknown = + () -> { + TableRow localUnknownFields = Preconditions.checkStateNotNull(unknownFields); + @Nullable + TableRow nested = (TableRow) (localUnknownFields.getF().get(finalIndex).getV()); + if (nested == null) { + nested = new TableRow(); + localUnknownFields.getF().set(finalIndex, new TableCell().setV(nested)); + } + return nested; + }; + @Nullable Object value = messageValueFromFieldValue( - fieldSchemaInformation, fieldDescriptor, cell.get("v"), ignoreUnknownValues); + fieldSchemaInformation, + fieldDescriptor, + cell.get("v"), + ignoreUnknownValues, + allowMissingRequiredFields, + getNestedUnknown); if (value != null) { builder.setField(fieldDescriptor, value); } @@ -501,6 +550,13 @@ public static DynamicMessage messageFromTableRow( } } + // If there are unknown fields, copy them into the output. + if (unknownFields != null) { + for (int i = cellsToProcess; i < cells.size(); ++i) { + unknownFields.getF().set(i, new TableCell().setV(cells.get(i).get("v"))); + } + } + try { return builder.build(); } catch (Exception e) { @@ -508,7 +564,13 @@ public static DynamicMessage messageFromTableRow( "Could convert schema for " + schemaInformation.getFullName(), e); } } else { - return messageFromMap(schemaInformation, descriptor, tableRow, ignoreUnknownValues); + return messageFromMap( + schemaInformation, + descriptor, + tableRow, + ignoreUnknownValues, + allowMissingRequiredFields, + unknownFields); } } @@ -575,10 +637,12 @@ private static void fieldDescriptorFromTableField( SchemaInformation schemaInformation, FieldDescriptor fieldDescriptor, @Nullable Object bqValue, - boolean ignoreUnknownValues) + boolean ignoreUnknownValues, + boolean allowMissingRequiredFields, + Supplier<@Nullable TableRow> getUnknownNestedFields) throws SchemaConversionException { if (bqValue == null) { - if (fieldDescriptor.isOptional()) { + if (fieldDescriptor.isOptional() || allowMissingRequiredFields) { return null; } else if (fieldDescriptor.isRepeated()) { return Collections.emptyList(); @@ -595,13 +659,23 @@ private static void fieldDescriptorFromTableField( if (o != null) { // repeated field cannot contain null. protoList.add( singularFieldToProtoValue( - schemaInformation, fieldDescriptor, o, ignoreUnknownValues)); + schemaInformation, + fieldDescriptor, + o, + ignoreUnknownValues, + allowMissingRequiredFields, + getUnknownNestedFields)); } } return protoList; } return singularFieldToProtoValue( - schemaInformation, fieldDescriptor, bqValue, ignoreUnknownValues); + schemaInformation, + fieldDescriptor, + bqValue, + ignoreUnknownValues, + allowMissingRequiredFields, + getUnknownNestedFields); } @VisibleForTesting @@ -609,7 +683,9 @@ private static void fieldDescriptorFromTableField( SchemaInformation schemaInformation, FieldDescriptor fieldDescriptor, @Nullable Object value, - boolean ignoreUnknownValues) + boolean ignoreUnknownValues, + boolean allowMissingRequiredFields, + Supplier<@Nullable TableRow> getUnknownNestedFields) throws SchemaConversionException { switch (schemaInformation.getType()) { case INT64: @@ -770,12 +846,22 @@ private static void fieldDescriptorFromTableField( if (value instanceof TableRow) { TableRow tableRow = (TableRow) value; return messageFromTableRow( - schemaInformation, fieldDescriptor.getMessageType(), tableRow, ignoreUnknownValues); + schemaInformation, + fieldDescriptor.getMessageType(), + tableRow, + ignoreUnknownValues, + allowMissingRequiredFields, + getUnknownNestedFields.get()); } else if (value instanceof AbstractMap) { // This will handle nested rows. AbstractMap map = ((AbstractMap) value); return messageFromMap( - schemaInformation, fieldDescriptor.getMessageType(), map, ignoreUnknownValues); + schemaInformation, + fieldDescriptor.getMessageType(), + map, + ignoreUnknownValues, + allowMissingRequiredFields, + getUnknownNestedFields.get()); } break; default: diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index 948c75cb756d..266de15d4be1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -95,16 +95,35 @@ static class Stream { final Type type; long nextFlushPosition; boolean finalized; + TableSchema currentSchema; + @Nullable TableSchema updatedSchema = null; Stream(String streamName, TableContainer tableContainer, Type type) { this.streamName = streamName; this.stream = Lists.newArrayList(); this.tableContainer = tableContainer; + this.currentSchema = tableContainer.getTable().getSchema(); this.type = type; this.finalized = false; this.nextFlushPosition = 0; } + void setUpdatedSchema(TableSchema tableSchema) { + this.updatedSchema = tableSchema; + } + + TableSchema getUpdatedSchema() { + return this.updatedSchema; + } + + WriteStream toWriteStream() { + return WriteStream.newBuilder() + .setName(streamName) + .setType(type) + .setTableSchema(TableRowToStorageApiProto.schemaToProtoTableSchema(currentSchema)) + .build(); + } + long finalizeStream() { this.finalized = true; return stream.size(); @@ -324,6 +343,12 @@ public void updateTableSchema(TableReference tableReference, TableSchema tableSc } // TODO: Only allow "legal" schema changes. tableContainer.table.setSchema(tableSchema); + + for (Stream stream : writeStreams.values()) { + if (stream.tableContainer == tableContainer) { + stream.setUpdatedSchema(tableSchema); + } + } } } @@ -495,19 +520,38 @@ public WriteStream createWriteStream(String tableUrn, Type type) tableReference.getDatasetId(), tableReference.getTableId()); String streamName = UUID.randomUUID().toString(); - writeStreams.put(streamName, new Stream(streamName, tableContainer, type)); - return WriteStream.newBuilder().setName(streamName).build(); + Stream stream = new Stream(streamName, tableContainer, type); + writeStreams.put(streamName, stream); + return stream.toWriteStream(); } } + @Override + @Nullable + public WriteStream getWriteStream(String streamName) { + synchronized (FakeDatasetService.class) { + @Nullable Stream stream = writeStreams.get(streamName); + if (stream != null) { + return stream.toWriteStream(); + } + } + return null; + } + @Override public StreamAppendClient getStreamAppendClient( String streamName, Descriptor descriptor, boolean useConnectionPool) { return new StreamAppendClient() { private Descriptor protoDescriptor; + private TableSchema currentSchema; + private @Nullable com.google.cloud.bigquery.storage.v1.TableSchema updatedSchema; { this.protoDescriptor = descriptor; + synchronized (FakeDatasetService.class) { + Stream stream = writeStreams.get(streamName); + currentSchema = stream.tableContainer.getTable().getSchema(); + } } @Override @@ -545,10 +589,25 @@ public ApiFuture appendRows(long offset, ProtoRows rows) rowIndexToErrorMessage)); } stream.appendRows(offset, tableRows); + if (stream.getUpdatedSchema() != null) { + com.google.cloud.bigquery.storage.v1.TableSchema newSchema = + TableRowToStorageApiProto.schemaToProtoTableSchema(stream.getUpdatedSchema()); + responseBuilder.setUpdatedSchema(newSchema); + if (this.updatedSchema == null) { + this.updatedSchema = newSchema; + } + } } return ApiFutures.immediateFuture(responseBuilder.build()); } + @Override + public com.google.cloud.bigquery.storage.v1.@org.checkerframework.checker.nullness.qual + .Nullable + TableSchema getUpdatedSchema() { + return this.updatedSchema; + } + @Override public void close() throws Exception {} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index b654b84d6a75..ab91f109c4d0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -66,9 +66,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; +import java.util.function.LongFunction; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.LongStream; import java.util.stream.StreamSupport; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; @@ -81,6 +83,7 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; @@ -98,6 +101,10 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaCreate; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -112,6 +119,7 @@ import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -1851,6 +1859,214 @@ public TableRow apply(Long input) { p.run(); } + @Test + public void testUpdateTableSchemaUseSet() throws Exception { + updateTableSchemaTest(true); + } + + @Test + public void testUpdateTableSchemaUseSetF() throws Exception { + updateTableSchemaTest(false); + } + + @Test + public void testUpdateTableSchemaNoUnknownValues() throws Exception { + assumeTrue(useStreaming); + assumeTrue(useStorageApi); + thrown.expect(IllegalArgumentException.class); + p.apply("create", Create.empty(TableRowJsonCoder.of())) + .apply( + BigQueryIO.writeTableRows() + .to(BigQueryHelpers.parseTableSpec("project-id:dataset-id.table")) + .withMethod(Method.STORAGE_WRITE_API) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) + .withAutoSchemaUpdate(true) + .withTestServices(fakeBqServices) + .withoutValidation()); + p.run(); + } + + @SuppressWarnings({"unused"}) + static class UpdateTableSchemaDoFn extends DoFn, TableRow> { + @TimerId("updateTimer") + private final TimerSpec updateTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + private final Duration timerOffset; + private final String updatedSchema; + private final FakeDatasetService fakeDatasetService; + + UpdateTableSchemaDoFn( + Duration timerOffset, TableSchema updatedSchema, FakeDatasetService fakeDatasetService) { + this.timerOffset = timerOffset; + this.updatedSchema = BigQueryHelpers.toJsonString(updatedSchema); + this.fakeDatasetService = fakeDatasetService; + } + + @ProcessElement + public void processElement( + @Element KV element, + @TimerId("updateTimer") Timer updateTimer, + OutputReceiver o) + throws IOException { + updateTimer.offset(timerOffset).setRelative(); + o.output(element.getValue()); + } + + @OnTimer("updateTimer") + public void onTimer(@Key String tableSpec) throws IOException { + fakeDatasetService.updateTableSchema( + BigQueryHelpers.parseTableSpec(tableSpec), + BigQueryHelpers.fromJsonString(updatedSchema, TableSchema.class)); + } + } + + public void updateTableSchemaTest(boolean useSet) throws Exception { + assumeTrue(useStreaming); + assumeTrue(useStorageApi); + + // Make sure that GroupIntoBatches does not buffer data. + p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(1); + p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(1); + + BigQueryIO.Write.Method method = + useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : Method.STORAGE_WRITE_API; + p.enableAbandonedNodeEnforcement(false); + + TableReference tableRef = BigQueryHelpers.parseTableSpec("project-id:dataset-id.table"); + TableSchema tableSchema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER"), + new TableFieldSchema().setName("req").setType("STRING").setMode("REQUIRED"))); + TableSchema tableSchemaUpdated = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER"), + new TableFieldSchema().setName("req").setType("STRING"), + new TableFieldSchema().setName("double_number").setType("INTEGER"))); + fakeDatasetService.createTable(new Table().setTableReference(tableRef).setSchema(tableSchema)); + + LongFunction getRowSet = + (LongFunction & Serializable) + (long i) -> { + TableRow row = + new TableRow() + .set("name", "name" + i) + .set("number", Long.toString(i)) + .set("double_number", Long.toString(i * 2)); + if (i <= 5) { + row = row.set("req", "foo"); + } + return row; + }; + + LongFunction getRowSetF = + (LongFunction & Serializable) + (long i) -> + new TableRow() + .setF( + ImmutableList.of( + new TableCell().setV("name" + i), + new TableCell().setV(Long.toString(i)), + new TableCell().setV(i > 5 ? null : "foo"), + new TableCell().setV(Long.toString(i * 2)))); + + LongFunction getRow = useSet ? getRowSet : getRowSetF; + + TestStream.Builder testStream = + TestStream.create(VarLongCoder.of()).advanceWatermarkTo(new Instant(0)); + // These rows contain unknown fields, which should be dropped. + for (long i = 0; i < 5; i++) { + testStream = testStream.addElements(i); + } + // Expire the timer, which should update the schema. + testStream = testStream.advanceProcessingTime(Duration.standardSeconds(10)); + // Add one element to trigger discovery of new schema. + testStream = testStream.addElements(5L); + testStream = testStream.advanceProcessingTime(Duration.standardSeconds(10)); + + // Now all fields should be known. + for (long i = 6; i < 10; i++) { + testStream = testStream.addElements(i); + } + + PCollection tableRows = + p.apply(testStream.advanceWatermarkToInfinity()) + .apply("getRow", MapElements.into(TypeDescriptor.of(TableRow.class)).via(getRow::apply)) + .apply("add key", WithKeys.of("project-id:dataset-id.table")) + .apply( + "update schema", + ParDo.of( + new UpdateTableSchemaDoFn( + Duration.standardSeconds(5), tableSchemaUpdated, fakeDatasetService))) + .setCoder(TableRowJsonCoder.of()); + + tableRows.apply( + BigQueryIO.writeTableRows() + .to(tableRef) + .withMethod(method) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) + .ignoreUnknownValues() + .withAutoSchemaUpdate(true) + .withTestServices(fakeBqServices) + .withoutValidation()); + + p.run(); + + Iterable expectedDroppedValues = + LongStream.range(0, 6) + .mapToObj(getRowSet) + .map(tr -> filterUnknownValues(tr, tableSchema.getFields())) + .collect(Collectors.toList()); + Iterable expectedFullValues = + LongStream.range(6, 10).mapToObj(getRowSet).collect(Collectors.toList()); + assertThat( + fakeDatasetService.getAllRows( + tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()), + containsInAnyOrder( + Iterables.toArray( + Iterables.concat(expectedDroppedValues, expectedFullValues), TableRow.class))); + } + + TableRow filterUnknownValues(TableRow row, List tableSchemaFields) { + Map schemaTypes = + tableSchemaFields.stream() + .collect(Collectors.toMap(TableFieldSchema::getName, TableFieldSchema::getType)); + Map> schemaFields = + tableSchemaFields.stream() + .filter(tf -> tf.getFields() != null && !tf.getFields().isEmpty()) + .collect(Collectors.toMap(TableFieldSchema::getName, TableFieldSchema::getFields)); + TableRow filtered = new TableRow(); + if (row.getF() != null) { + List values = Lists.newArrayList(); + for (int i = 0; i < tableSchemaFields.size(); ++i) { + String fieldType = tableSchemaFields.get(i).getType(); + Object value = row.getF().get(i).getV(); + if (fieldType.equals("STRUCT") || fieldType.equals("RECORD")) { + value = filterUnknownValues((TableRow) value, tableSchemaFields.get(i).getFields()); + } + values.add(new TableCell().setV(value)); + } + filtered = filtered.setF(values); + } else { + for (Map.Entry entry : row.entrySet()) { + Object value = entry.getValue(); + @Nullable String fieldType = schemaTypes.get(entry.getKey()); + if (fieldType != null) { + if (fieldType.equals("STRUCT") || fieldType.equals("RECORD")) { + value = filterUnknownValues((TableRow) value, schemaFields.get(entry.getKey())); + } + filtered = filtered.set(entry.getKey(), value); + } + } + } + return filtered; + } + @Test public void testBigQueryIOGetName() { assertEquals( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java index da803298d035..d143315ee59d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java @@ -751,7 +751,7 @@ public void testMessageFromTableRow() throws Exception { TableRowToStorageApiProto.SchemaInformation.fromTableSchema(NESTED_TABLE_SCHEMA); DynamicMessage msg = TableRowToStorageApiProto.messageFromTableRow( - schemaInformation, descriptor, tableRow, false); + schemaInformation, descriptor, tableRow, false, false, null); assertEquals(4, msg.getAllFields().size()); Map fieldDescriptors = @@ -771,7 +771,7 @@ public void testMessageWithFFromTableRow() throws Exception { TableRowToStorageApiProto.SchemaInformation.fromTableSchema(BASE_TABLE_SCHEMA); DynamicMessage msg = TableRowToStorageApiProto.messageFromTableRow( - schemaInformation, descriptor, BASE_TABLE_ROW, false); + schemaInformation, descriptor, BASE_TABLE_ROW, false, false, null); assertBaseRecord(msg, true); } @@ -814,7 +814,7 @@ public void testRepeatedDescriptorFromTableSchema() throws Exception { TableRowToStorageApiProto.SchemaInformation.fromTableSchema(REPEATED_MESSAGE_SCHEMA); DynamicMessage msg = TableRowToStorageApiProto.messageFromTableRow( - schemaInformation, descriptor, repeatedRow, false); + schemaInformation, descriptor, repeatedRow, false, false, null); assertEquals(4, msg.getAllFields().size()); Map fieldDescriptors = @@ -859,7 +859,7 @@ public void testNullRepeatedDescriptorFromTableSchema() throws Exception { TableRowToStorageApiProto.SchemaInformation.fromTableSchema(REPEATED_MESSAGE_SCHEMA); DynamicMessage msg = TableRowToStorageApiProto.messageFromTableRow( - schemaInformation, descriptor, repeatedRow, false); + schemaInformation, descriptor, repeatedRow, false, false, null); Map fieldDescriptors = descriptor.getFields().stream() @@ -913,7 +913,7 @@ public void testIntegerTypeConversion() throws DescriptorValidationException { try { Object converted = TableRowToStorageApiProto.singularFieldToProtoValue( - fieldSchema, fieldDescriptor, sourceValue, false); + fieldSchema, fieldDescriptor, sourceValue, false, false, () -> null); assertEquals(expectedConvertedValue, converted); } catch (SchemaConversionException e) { fail( @@ -959,7 +959,7 @@ public void testIntegerTypeConversion() throws DescriptorValidationException { String expectedError = (String) invalidValue[1]; try { TableRowToStorageApiProto.singularFieldToProtoValue( - fieldSchema, fieldDescriptor, sourceValue, false); + fieldSchema, fieldDescriptor, sourceValue, false, false, () -> null); fail( "Expected to throw an exception converting " + sourceValue @@ -984,7 +984,8 @@ public void testRejectUnknownField() throws Exception { TableRowToStorageApiProto.SchemaInformation.fromTableSchema(BASE_TABLE_SCHEMA_NO_F); thrown.expect(TableRowToStorageApiProto.SchemaConversionException.class); - TableRowToStorageApiProto.messageFromTableRow(schemaInformation, descriptor, row, false); + TableRowToStorageApiProto.messageFromTableRow( + schemaInformation, descriptor, row, false, false, null); } @Test @@ -1000,7 +1001,8 @@ public void testRejectUnknownFieldF() throws Exception { TableRowToStorageApiProto.SchemaInformation.fromTableSchema(BASE_TABLE_SCHEMA); thrown.expect(TableRowToStorageApiProto.SchemaConversionException.class); - TableRowToStorageApiProto.messageFromTableRow(schemaInformation, descriptor, row, false); + TableRowToStorageApiProto.messageFromTableRow( + schemaInformation, descriptor, row, false, false, null); } @Test @@ -1017,7 +1019,8 @@ public void testRejectUnknownNestedField() throws Exception { TableRowToStorageApiProto.SchemaInformation.fromTableSchema(NESTED_TABLE_SCHEMA); thrown.expect(TableRowToStorageApiProto.SchemaConversionException.class); - TableRowToStorageApiProto.messageFromTableRow(schemaInformation, descriptor, topRow, false); + TableRowToStorageApiProto.messageFromTableRow( + schemaInformation, descriptor, topRow, false, false, null); } @Test @@ -1035,6 +1038,78 @@ public void testRejectUnknownNestedFieldF() throws Exception { TableRowToStorageApiProto.SchemaInformation.fromTableSchema(NESTED_TABLE_SCHEMA); thrown.expect(TableRowToStorageApiProto.SchemaConversionException.class); - TableRowToStorageApiProto.messageFromTableRow(schemaInformation, descriptor, topRow, false); + + TableRowToStorageApiProto.messageFromTableRow( + schemaInformation, descriptor, topRow, false, false, null); + } + + @Test + public void testIgnoreUnknownField() throws Exception { + TableRow row = new TableRow(); + row.putAll(BASE_TABLE_ROW_NO_F); + row.set("unknown", "foobar"); + + Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema(BASE_TABLE_SCHEMA_NO_F, true); + TableRowToStorageApiProto.SchemaInformation schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(BASE_TABLE_SCHEMA_NO_F); + + TableRow ignored = new TableRow(); + TableRowToStorageApiProto.messageFromTableRow( + schemaInformation, descriptor, row, true, false, ignored); + assertEquals(1, ignored.size()); + assertEquals("foobar", ignored.get("unknown")); + } + + @Test + public void testIgnoreUnknownFieldF() throws Exception { + TableRow row = new TableRow(); + List cells = Lists.newArrayList(BASE_TABLE_ROW.getF()); + cells.add(new TableCell().setV("foobar")); + row.setF(cells); + + Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema(BASE_TABLE_SCHEMA, true); + TableRowToStorageApiProto.SchemaInformation schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(BASE_TABLE_SCHEMA); + + TableRow ignored = new TableRow(); + TableRowToStorageApiProto.messageFromTableRow( + schemaInformation, descriptor, row, true, false, ignored); + assertEquals(BASE_TABLE_ROW.getF().size() + 1, ignored.getF().size()); + assertEquals("foobar", ignored.getF().get(BASE_TABLE_ROW.getF().size()).getV()); + } + + @Test + public void testIgnoreUnknownNestedField() throws Exception { + TableRow rowNoF = new TableRow(); + rowNoF.putAll(BASE_TABLE_ROW_NO_F); + rowNoF.set("unknown", "foobar"); + TableRow rowWithF = new TableRow(); + List cells = Lists.newArrayList(BASE_TABLE_ROW.getF()); + cells.add(new TableCell().setV("foobar")); + rowWithF.setF(cells); + TableRow topRow = + new TableRow() + .set("nestedValueNoF1", rowNoF) + .set("nestedValue1", rowWithF) + .set("unknowntop", "foobar"); + + Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema(NESTED_TABLE_SCHEMA, true); + TableRowToStorageApiProto.SchemaInformation schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(NESTED_TABLE_SCHEMA); + + TableRow unknown = new TableRow(); + TableRowToStorageApiProto.messageFromTableRow( + schemaInformation, descriptor, topRow, true, false, unknown); + assertEquals(3, unknown.size()); + assertEquals("foobar", unknown.get("unknowntop")); + assertEquals(1, ((TableRow) unknown.get("nestedvalue1")).size()); + assertEquals(1, ((TableRow) unknown.get("nestedvaluenof1")).size()); + assertEquals( + "foobar", + ((TableRow) unknown.get("nestedvalue1")).getF().get(BASE_TABLE_ROW.getF().size()).getV()); + assertEquals("foobar", ((TableRow) unknown.get("nestedvaluenof1")).get("unknown")); } }