diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 6493ee4c267d..1079ee0ad4b5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2846,7 +2846,6 @@ private WriteResult continueExpandTyped( StorageApiLoads storageApiLoads = new StorageApiLoads( destinationCoder, - elementCoder, storageApiDynamicDestinations, getCreateDisposition(), getKmsKey(), diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java new file mode 100644 index 000000000000..d58c7af8cf66 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * A transform that converts messages to protocol buffers in preparation for writing to BigQuery. + */ +public class StorageApiConvertMessages + extends PTransform< + PCollection>, PCollection>> { + private final StorageApiDynamicDestinations dynamicDestinations; + + public StorageApiConvertMessages( + StorageApiDynamicDestinations dynamicDestinations) { + this.dynamicDestinations = dynamicDestinations; + } + + @Override + public PCollection> expand( + PCollection> input) { + String operationName = input.getName() + "/" + getName(); + + return input.apply( + "Convert to message", + ParDo.of(new ConvertMessagesDoFn<>(dynamicDestinations, operationName)) + .withSideInputs(dynamicDestinations.getSideInputs())); + } + + public static class ConvertMessagesDoFn + extends DoFn, KV> { + private final StorageApiDynamicDestinations dynamicDestinations; + private TwoLevelMessageConverterCache messageConverters; + + ConvertMessagesDoFn( + StorageApiDynamicDestinations dynamicDestinations, + String operationName) { + this.dynamicDestinations = dynamicDestinations; + this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); + } + + @ProcessElement + public void processElement( + ProcessContext c, + @Element KV element, + OutputReceiver> o) + throws Exception { + dynamicDestinations.setSideInputAccessorFromProcessContext(c); + MessageConverter messageConverter = + messageConverters.get(element.getKey(), dynamicDestinations); + o.output( + KV.of(element.getKey(), messageConverter.toMessage(element.getValue()).toByteArray())); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java index fd3b638fff9c..602ebea3e234 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java @@ -21,19 +21,16 @@ import java.nio.ByteBuffer; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.AfterFirst; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.values.KV; @@ -48,10 +45,9 @@ public class StorageApiLoads extends PTransform>, WriteResult> { private static final Logger LOG = LoggerFactory.getLogger(StorageApiLoads.class); - static final int FILE_TRIGGERING_RECORD_COUNT = 100; + static final int MAX_BATCH_SIZE_BYTES = 2 * 1024 * 1024; private final Coder destinationCoder; - private final Coder elementCoder; private final StorageApiDynamicDestinations dynamicDestinations; private final CreateDisposition createDisposition; private final String kmsKey; @@ -61,7 +57,6 @@ public class StorageApiLoads public StorageApiLoads( Coder destinationCoder, - Coder elementCoder, StorageApiDynamicDestinations dynamicDestinations, CreateDisposition createDisposition, String kmsKey, @@ -69,7 +64,6 @@ public StorageApiLoads( BigQueryServices bqServices, int numShards) { this.destinationCoder = destinationCoder; - this.elementCoder = elementCoder; this.dynamicDestinations = dynamicDestinations; this.createDisposition = createDisposition; this.kmsKey = kmsKey; @@ -86,25 +80,17 @@ public WriteResult expand(PCollection> input) { public WriteResult expandTriggered(PCollection> input) { // Handle triggered, low-latency loads into BigQuery. PCollection> inputInGlobalWindow = - input.apply( - "rewindowIntoGlobal", - Window.>into(new GlobalWindows()) - .triggering( - Repeatedly.forever( - AfterFirst.of( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(triggeringFrequency), - AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT)))) - .discardingFiredPanes()); + input.apply("rewindowIntoGlobal", Window.into(new GlobalWindows())); // First shard all the records. // TODO(reuvenlax): Add autosharding support so that users don't have to pick a shard count. - PCollection, ElementT>> shardedRecords = + PCollection, byte[]>> shardedRecords = inputInGlobalWindow + .apply("Convert", new StorageApiConvertMessages<>(dynamicDestinations)) .apply( "AddShard", ParDo.of( - new DoFn, KV, ElementT>>() { + new DoFn, KV, byte[]>>() { int shardNumber; @Setup @@ -114,8 +100,8 @@ public void setup() { @ProcessElement public void processElement( - @Element KV element, - OutputReceiver, ElementT>> o) { + @Element KV element, + OutputReceiver, byte[]>> o) { DestinationT destination = element.getKey(); ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); buffer.putInt(++shardNumber % numShards); @@ -123,10 +109,14 @@ public void processElement( KV.of(ShardedKey.of(destination, buffer.array()), element.getValue())); } })) - .setCoder(KvCoder.of(ShardedKey.Coder.of(destinationCoder), elementCoder)); + .setCoder(KvCoder.of(ShardedKey.Coder.of(destinationCoder), ByteArrayCoder.of())); - PCollection, Iterable>> groupedRecords = - shardedRecords.apply("GroupIntoShards", GroupByKey.create()); + PCollection, Iterable>> groupedRecords = + shardedRecords.apply( + "GroupIntoBatches", + GroupIntoBatches., byte[]>ofByteSize( + MAX_BATCH_SIZE_BYTES, (byte[] e) -> (long) e.length) + .withMaxBufferingDuration(triggeringFrequency)); groupedRecords.apply( "StorageApiWriteSharded", diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index de04b120d8d8..916a1bbdb529 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -26,7 +26,6 @@ import com.google.cloud.bigquery.storage.v1beta2.WriteStream.Type; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.Descriptor; -import com.google.protobuf.Message; import io.grpc.Status; import io.grpc.Status.Code; import java.io.IOException; @@ -91,7 +90,7 @@ @SuppressWarnings("FutureReturnValueIgnored") public class StorageApiWritesShardedRecords extends PTransform< - PCollection, Iterable>>, PCollection> { + PCollection, Iterable>>, PCollection> { private static final Logger LOG = LoggerFactory.getLogger(StorageApiWritesShardedRecords.class); private final StorageApiDynamicDestinations dynamicDestinations; @@ -104,7 +103,7 @@ public class StorageApiWritesShardedRecords private static final Cache APPEND_CLIENTS = CacheBuilder.newBuilder() - .expireAfterAccess(15, TimeUnit.MINUTES) + .expireAfterAccess(5, TimeUnit.MINUTES) .removalListener( (RemovalNotification removal) -> { @Nullable final StreamAppendClient streamAppendClient = removal.getValue(); @@ -152,7 +151,7 @@ public StorageApiWritesShardedRecords( @Override public PCollection expand( - PCollection, Iterable>> input) { + PCollection, Iterable>> input) { String operationName = input.getName() + "/" + getName(); // Append records to the Storage API streams. PCollection> written = @@ -194,11 +193,11 @@ public PCollection expand( * parameter controls how many rows are batched into a single ProtoRows object before we move on * to the next one. */ - static class SplittingIterable implements Iterable { - private final Iterable underlying; + static class SplittingIterable implements Iterable { + private final Iterable underlying; private final long splitSize; - public SplittingIterable(Iterable underlying, long splitSize) { + public SplittingIterable(Iterable underlying, long splitSize) { this.underlying = underlying; this.splitSize = splitSize; } @@ -206,7 +205,7 @@ public SplittingIterable(Iterable underlying, long splitSize) { @Override public Iterator iterator() { return new Iterator() { - final Iterator underlyingIterator = underlying.iterator(); + final Iterator underlyingIterator = underlying.iterator(); @Override public boolean hasNext() { @@ -222,7 +221,7 @@ public ProtoRows next() { ProtoRows.Builder inserts = ProtoRows.newBuilder(); long bytesSize = 0; while (underlyingIterator.hasNext()) { - ByteString byteString = underlyingIterator.next().toByteString(); + ByteString byteString = ByteString.copyFrom(underlyingIterator.next()); inserts.addSerializedRows(byteString); bytesSize += byteString.size(); if (bytesSize > splitSize) { @@ -236,7 +235,7 @@ public ProtoRows next() { } class WriteRecordsDoFn - extends DoFn, Iterable>, KV> { + extends DoFn, Iterable>, KV> { private final Counter recordsAppended = Metrics.counter(WriteRecordsDoFn.class, "recordsAppended"); private final Counter streamsCreated = @@ -254,10 +253,10 @@ class WriteRecordsDoFn private final Distribution appendSplitDistribution = Metrics.distribution(WriteRecordsDoFn.class, "appendSplitDistribution"); - private Map destinations = Maps.newHashMap(); - private TwoLevelMessageConverterCache messageConverters; + private Map destinations = Maps.newHashMap(); + // Stores the current stream for this key. @StateId("streamName") private final StateSpec> streamNameSpec = StateSpecs.value(); @@ -301,7 +300,7 @@ String getOrCreateStream( public void process( ProcessContext c, final PipelineOptions pipelineOptions, - @Element KV, Iterable> element, + @Element KV, Iterable> element, final @AlwaysFetched @StateId("streamName") ValueState streamName, final @AlwaysFetched @StateId("streamOffset") ValueState streamOffset, final OutputReceiver> o) @@ -336,12 +335,9 @@ public void process( // Each ProtoRows object contains at most 1MB of rows. // TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely if - // already proto or - // already schema. + // already proto or already schema. final long oneMb = 1024 * 1024; - Iterable messages = - new SplittingIterable<>( - Iterables.transform(element.getValue(), e -> messageConverter.toMessage(e)), oneMb); + Iterable messages = new SplittingIterable(element.getValue(), oneMb); class AppendRowsContext extends RetryManager.Operation.Context { final ShardedKey key; @@ -412,7 +408,7 @@ public String toString() { Instant now = Instant.now(); List contexts = Lists.newArrayList(); RetryManager retryManager = - new RetryManager<>(Duration.standardSeconds(1), Duration.standardMinutes(1), 1000); + new RetryManager<>(Duration.standardSeconds(1), Duration.standardSeconds(10), 1000); int numSplits = 0; for (ProtoRows protoRows : messages) { ++numSplits;