From d10dac1afd1a10a1cb77568a8d0c6300603575d6 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Fri, 12 Jul 2024 14:46:37 -0400 Subject: [PATCH] Add options to control number of Storage API connections when using multiplexing (#31721) * add options to set min and max connections to connection management pool; rename counter to be more accurate * add multiplexing description * add to CHANGES.md * clarify documentation and address comments * adjust description * add details --- CHANGES.md | 2 ++ .../sdk/io/gcp/bigquery/AppendClientInfo.java | 8 +++--- .../sdk/io/gcp/bigquery/BigQueryOptions.java | 27 +++++++++++++++++++ .../io/gcp/bigquery/BigQueryServicesImpl.java | 9 +++++++ 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 96c436d89ecd..fc94877a2bb3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -67,6 +67,7 @@ ## New Features / Improvements * Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)). +* Added options to control the number of Storage API multiplexing connections ([#31721](https://github.com/apache/beam/pull/31721)) * [IcebergIO] All specified catalog properties are passed through to the connector ([#31726](https://github.com/apache/beam/pull/31726)) * Removed a 3rd party LGPL dependency from the Go SDK ([#31765](https://github.com/apache/beam/issues/31765)). * Support for MapState and SetState when using Dataflow Runner v1 with Streaming Engine (Java) ([[#18200](https://github.com/apache/beam/issues/18200)]) @@ -82,6 +83,7 @@ ## Bugfixes +* Fixed a bug in BigQueryIO batch Storage Write API that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710)) * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Security Fixes diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java index 5a12e81ea79d..7505f77fb5b4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -40,8 +40,8 @@ */ @AutoValue abstract class AppendClientInfo { - private final Counter activeConnections = - Metrics.counter(AppendClientInfo.class, "activeConnections"); + private final Counter activeStreamAppendClients = + Metrics.counter(AppendClientInfo.class, "activeStreamAppendClients"); abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient(); @@ -123,7 +123,7 @@ public AppendClientInfo withAppendClient( writeStreamService.getStreamAppendClient( streamName, getDescriptor(), useConnectionPool, missingValueInterpretation); - activeConnections.inc(); + activeStreamAppendClients.inc(); return toBuilder().setStreamName(streamName).setStreamAppendClient(client).build(); } @@ -133,7 +133,7 @@ public void close() { BigQueryServices.StreamAppendClient client = getStreamAppendClient(); if (client != null) { getCloseAppendClient().accept(client); - activeConnections.dec(); + activeStreamAppendClients.dec(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index cd1fc6d3842c..ba76f483f774 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -109,6 +109,28 @@ public interface BigQueryOptions void setNumStorageWriteApiStreamAppendClients(Integer value); + @Description( + "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing (ie. useStorageApiConnectionPool=true), " + + "this option sets the minimum number of connections each pool creates before any connections are shared. This is " + + "on a per worker, per region basis. Note that in practice, the minimum number of connections created is the minimum " + + "of this value and (numStorageWriteApiStreamAppendClients x num destinations). BigQuery will create this many " + + "connections at first and will only create more connections if the current ones are \"overwhelmed\". Consider " + + "increasing this value if you are running into performance issues.") + @Default.Integer(2) + Integer getMinConnectionPoolConnections(); + + void setMinConnectionPoolConnections(Integer value); + + @Description( + "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing (ie. useStorageApiConnectionPool=true), " + + "this option sets the maximum number of connections each pool creates. This is on a per worker, per region basis. " + + "If writing to many dynamic destinations (>20) and experiencing performance issues or seeing append operations competing" + + "for streams, consider increasing this value.") + @Default.Integer(20) + Integer getMaxConnectionPoolConnections(); + + void setMaxConnectionPoolConnections(Integer value); + @Description("The max number of messages inflight that we expect each connection will retain.") @Default.Long(1000) Long getStorageWriteMaxInflightRequests(); @@ -122,6 +144,11 @@ public interface BigQueryOptions void setStorageWriteMaxInflightBytes(Long value); + @Description( + "Enables multiplexing mode, where multiple tables can share the same connection. Only available when writing with STORAGE_API_AT_LEAST_ONCE" + + " mode. This is recommended if your write operation is creating 20+ connections. When using multiplexing, consider tuning " + + "the number of connections created by the connection pool with minConnectionPoolConnections and maxConnectionPoolConnections. " + + "For more information, see https://cloud.google.com/bigquery/docs/write-api-best-practices#connection_pool_management") @Default.Boolean(false) Boolean getUseStorageApiConnectionPool(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 2bdba0b053c8..c6b0e17e59db 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -69,6 +69,7 @@ import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; +import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool; import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest; @@ -1423,6 +1424,14 @@ public StreamAppendClient getStreamAppendClient( bqIOMetadata.getBeamJobId() == null ? "" : bqIOMetadata.getBeamJobId(), bqIOMetadata.getBeamWorkerId() == null ? "" : bqIOMetadata.getBeamWorkerId()); + ConnectionWorkerPool.setOptions( + ConnectionWorkerPool.Settings.builder() + .setMinConnectionsPerRegion( + options.as(BigQueryOptions.class).getMinConnectionPoolConnections()) + .setMaxConnectionsPerRegion( + options.as(BigQueryOptions.class).getMaxConnectionPoolConnections()) + .build()); + StreamWriter streamWriter = StreamWriter.newBuilder(streamName, newWriteClient) .setExecutorProvider(