diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index a9beb5cbd7c9..cfebb7542166 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -93,12 +93,22 @@ public interface BigQueryOptions void setUseStorageWriteApiAtLeastOnce(Boolean value); @Description( - "If set, then BigQueryIO.Write will default to using this number of Storage Write API streams.") + "If set, then BigQueryIO.Write will default to using this number of Storage Write API streams. ") @Default.Integer(0) Integer getNumStorageWriteApiStreams(); void setNumStorageWriteApiStreams(Integer value); + @Description( + "When using the {@link org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method#STORAGE_API_AT_LEAST_ONCE} write method, " + + "this option sets the number of stream append clients that will be allocated at a per worker and destination basis. " + + "A large value can cause a large pipeline to go over the BigQuery connection quota quickly on a job with " + + "enough number of workers. On the case of low-mid volume pipelines using the default configuration should be sufficient.") + @Default.Integer(1) + Integer getNumStorageWriteApiStreamAppendClients(); + + void setNumStorageWriteApiStreamAppendClients(Integer value); + @Description( "If set, then BigQueryIO.Write will default to triggering the Storage Write API writes this often.") Integer getStorageWriteApiTriggeringFrequencySec(); @@ -129,4 +139,10 @@ public interface BigQueryOptions Integer getStorageApiAppendThresholdBytes(); void setStorageApiAppendThresholdBytes(Integer value); + + @Description("Maximum (best effort) record count of a single append to the storage API.") + @Default.Integer(150000) + Integer getStorageApiAppendThresholdRecordCount(); + + void setStorageApiAppendThresholdRecordCount(Integer value); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java index e433925e5b35..35b3ddfd080a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java @@ -40,7 +40,6 @@ public StorageApiWriteRecordsInconsistent( StorageApiDynamicDestinations dynamicDestinations, BigQueryServices bqServices) { this.dynamicDestinations = dynamicDestinations; - ; this.bqServices = bqServices; } @@ -57,7 +56,9 @@ public PCollection expand(PCollection private final BigQueryServices bqServices; private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool(); - // The Guava cache object is threadsafe. However our protocol requires that client pin the - // StreamAppendClient - // after looking up the cache, and we must ensure that the cache is not accessed in between the - // lookup and the pin - // (any access of the cache could trigger element expiration). Therefore most used of - // APPEND_CLIENTS should - // synchronize. + /** + * The Guava cache object is thread-safe. However our protocol requires that client pin the + * StreamAppendClient after looking up the cache, and we must ensure that the cache is not + * accessed in between the lookup and the pin (any access of the cache could trigger element + * expiration). Therefore most used of APPEND_CLIENTS should synchronize. + */ private static final Cache APPEND_CLIENTS = CacheBuilder.newBuilder() .expireAfterAccess(15, TimeUnit.MINUTES) @@ -138,7 +139,9 @@ public PCollection expand(PCollection messageConverter, DatasetService datasetService, - boolean useDefaultStream) { + boolean useDefaultStream, + int streamAppendClientCount) { this.tableUrn = tableUrn; this.messageConverter = messageConverter; this.pendingMessages = Lists.newArrayList(); this.datasetService = datasetService; this.useDefaultStream = useDefaultStream; this.descriptorWrapper = messageConverter.getSchemaDescriptor(); + this.clientNumber = new Random().nextInt(streamAppendClientCount); } void teardown() { @@ -197,6 +203,13 @@ String getDefaultStreamName() { return BigQueryHelpers.stripPartitionDecorator(tableUrn) + "/streams/_default"; } + String getStreamAppendClientCacheEntryKey() { + if (useDefaultStream) { + return getDefaultStreamName() + "-client" + clientNumber; + } + return this.streamName; + } + String createStreamIfNeeded() { try { if (!useDefaultStream) { @@ -213,6 +226,10 @@ String createStreamIfNeeded() { return this.streamName; } + StreamAppendClient generateClient() throws Exception { + return datasetService.getStreamAppendClient(streamName, descriptorWrapper.descriptor); + } + StreamAppendClient getStreamAppendClient(boolean lookupCache) { try { if (streamAppendClient == null) { @@ -221,14 +238,11 @@ StreamAppendClient getStreamAppendClient(boolean lookupCache) { if (lookupCache) { this.streamAppendClient = APPEND_CLIENTS.get( - streamName, - () -> - datasetService.getStreamAppendClient( - streamName, descriptorWrapper.descriptor)); + getStreamAppendClientCacheEntryKey(), () -> generateClient()); } else { - this.streamAppendClient = - datasetService.getStreamAppendClient(streamName, descriptorWrapper.descriptor); - APPEND_CLIENTS.put(streamName, this.streamAppendClient); + this.streamAppendClient = generateClient(); + // override the clients in the cache + APPEND_CLIENTS.put(getStreamAppendClientCacheEntryKey(), streamAppendClient); } this.streamAppendClient.pin(); } @@ -244,7 +258,7 @@ StreamAppendClient getStreamAppendClient(boolean lookupCache) { void maybeTickleCache() { if (streamAppendClient != null && Instant.now().isAfter(nextCacheTickle)) { synchronized (APPEND_CLIENTS) { - APPEND_CLIENTS.getIfPresent(streamName); + APPEND_CLIENTS.getIfPresent(getStreamAppendClientCacheEntryKey()); } nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1)); } @@ -262,12 +276,13 @@ void invalidateWriteStream() { // cache still contains the object we created before invalidating (in case another // thread has already invalidated // and recreated the stream). + String cacheEntryKey = getStreamAppendClientCacheEntryKey(); @Nullable - StreamAppendClient cachedAppendClient = APPEND_CLIENTS.getIfPresent(streamName); + StreamAppendClient cachedAppendClient = APPEND_CLIENTS.getIfPresent(cacheEntryKey); if (cachedAppendClient != null && System.identityHashCode(cachedAppendClient) == System.identityHashCode(streamAppendClient)) { - APPEND_CLIENTS.invalidate(streamName); + APPEND_CLIENTS.invalidate(cacheEntryKey); } } streamAppendClient = null; @@ -327,7 +342,8 @@ void flush(RetryManager> retryMa inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds()); if (writeStream.getInflightWaitSeconds() > 5) { LOG.warn( - "Storage Api write delay more than " + writeStream.getInflightWaitSeconds()); + "Storage Api write delay more than {} seconds.", + writeStream.getInflightWaitSeconds()); } return response; } catch (Exception e) { @@ -335,11 +351,11 @@ void flush(RetryManager> retryMa } }, contexts -> { - LOG.info( - "Append to stream " - + streamName - + " failed with error " - + Iterables.getFirst(contexts, null).getError()); + LOG.warn( + "Append to stream {} by client #{} failed with error, operations will be retried. Details: {}", + streamName, + clientNumber, + retrieveErrorDetails(contexts)); invalidateWriteStream(); appendFailures.inc(); return RetryType.RETRY_ALL_OPERATIONS; @@ -350,6 +366,20 @@ void flush(RetryManager> retryMa new Context<>()); maybeTickleCache(); } + + String retrieveErrorDetails(Iterable> contexts) { + return StreamSupport.stream(contexts.spliterator(), false) + .map(ctx -> ctx.getError()) + .map( + err -> + String.format( + "message: %s, stacktrace: %s", + err.toString(), + Lists.newArrayList(err.getStackTrace()).stream() + .map(se -> se.toString()) + .collect(Collectors.joining("\n")))) + .collect(Collectors.joining(",")); + } } private Map destinations = Maps.newHashMap(); @@ -357,28 +387,32 @@ void flush(RetryManager> retryMa private transient @Nullable DatasetService datasetService; private int numPendingRecords = 0; private int numPendingRecordBytes = 0; - private static final int FLUSH_THRESHOLD_RECORDS = 150000; private final int flushThresholdBytes; + private final int flushThresholdCount; private final StorageApiDynamicDestinations dynamicDestinations; private final BigQueryServices bqServices; private final boolean useDefaultStream; + private int streamAppendClientCount; WriteRecordsDoFn( String operationName, StorageApiDynamicDestinations dynamicDestinations, BigQueryServices bqServices, boolean useDefaultStream, - int flushThresholdBytes) { + int flushThresholdBytes, + int flushThresholdCount, + int streamAppendClientCount) { this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; this.useDefaultStream = useDefaultStream; this.flushThresholdBytes = flushThresholdBytes; + this.flushThresholdCount = flushThresholdCount; + this.streamAppendClientCount = streamAppendClientCount; } boolean shouldFlush() { - return numPendingRecords > FLUSH_THRESHOLD_RECORDS - || numPendingRecordBytes > flushThresholdBytes; + return numPendingRecords > flushThresholdCount || numPendingRecordBytes > flushThresholdBytes; } void flushIfNecessary() throws Exception { @@ -432,7 +466,11 @@ DestinationState createDestinationState( throw new RuntimeException(e); } return new DestinationState( - tableDestination1.getTableUrn(), messageConverter, datasetService, useDefaultStream); + tableDestination1.getTableUrn(), + messageConverter, + datasetService, + useDefaultStream, + streamAppendClientCount); } @ProcessElement