From 2f55b0f965d442ed879b11996dfe9a2217027f97 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 28 Jun 2024 19:14:34 -0400 Subject: [PATCH 1/8] add options to set min and max connections to connection management pool; rename counter to be more accurate --- .../sdk/io/gcp/bigquery/AppendClientInfo.java | 8 ++++---- .../sdk/io/gcp/bigquery/BigQueryOptions.java | 20 +++++++++++++++++++ .../io/gcp/bigquery/BigQueryServicesImpl.java | 9 +++++++++ 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java index 5a12e81ea79d..7505f77fb5b4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -40,8 +40,8 @@ */ @AutoValue abstract class AppendClientInfo { - private final Counter activeConnections = - Metrics.counter(AppendClientInfo.class, "activeConnections"); + private final Counter activeStreamAppendClients = + Metrics.counter(AppendClientInfo.class, "activeStreamAppendClients"); abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient(); @@ -123,7 +123,7 @@ public AppendClientInfo withAppendClient( writeStreamService.getStreamAppendClient( streamName, getDescriptor(), useConnectionPool, missingValueInterpretation); - activeConnections.inc(); + activeStreamAppendClients.inc(); return toBuilder().setStreamName(streamName).setStreamAppendClient(client).build(); } @@ -133,7 +133,7 @@ public void close() { BigQueryServices.StreamAppendClient client = getStreamAppendClient(); if (client != null) { getCloseAppendClient().accept(client); - activeConnections.dec(); + activeStreamAppendClients.dec(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index cd1fc6d3842c..c973d2d0efdf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -109,6 +109,26 @@ public interface BigQueryOptions void setNumStorageWriteApiStreamAppendClients(Integer value); + @Description( + "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing (ie. useStorageApiConnectionPool=true), " + + "this option sets the minimum number of connections each pool creates. This is on a per worker, per region basis. " + + "Note that in practice, the minimum number of connections created is the minimum of this value and " + + "(numStorageWriteApiStreamAppendClients x num destinations).") + @Default.Integer(2) + Integer getMinConnectionPoolConnections(); + + void setMinConnectionPoolConnections(Integer value); + + @Description( + "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing (ie. useStorageApiConnectionPool=true), " + + "this option sets the maximum number of connections each pool creates. This is on a per worker, per region basis. " + + "This value should be greater than or equal to the total number of dynamic destinations, otherwise a " + + "race condition occurs where append operations compete over streams.") + @Default.Integer(20) + Integer getMaxConnectionPoolConnections(); + + void setMaxConnectionPoolConnections(Integer value); + @Description("The max number of messages inflight that we expect each connection will retain.") @Default.Long(1000) Long getStorageWriteMaxInflightRequests(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 2bdba0b053c8..c6b0e17e59db 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -69,6 +69,7 @@ import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; +import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool; import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest; @@ -1423,6 +1424,14 @@ public StreamAppendClient getStreamAppendClient( bqIOMetadata.getBeamJobId() == null ? "" : bqIOMetadata.getBeamJobId(), bqIOMetadata.getBeamWorkerId() == null ? "" : bqIOMetadata.getBeamWorkerId()); + ConnectionWorkerPool.setOptions( + ConnectionWorkerPool.Settings.builder() + .setMinConnectionsPerRegion( + options.as(BigQueryOptions.class).getMinConnectionPoolConnections()) + .setMaxConnectionsPerRegion( + options.as(BigQueryOptions.class).getMaxConnectionPoolConnections()) + .build()); + StreamWriter streamWriter = StreamWriter.newBuilder(streamName, newWriteClient) .setExecutorProvider( From 20a9c4717730edc6b4f63ca6fae275a720404f56 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 28 Jun 2024 19:20:10 -0400 Subject: [PATCH 2/8] add multiplexing description --- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index c973d2d0efdf..6584e9dfd565 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -142,6 +142,9 @@ public interface BigQueryOptions void setStorageWriteMaxInflightBytes(Long value); + @Description( + "Enables multiplexing mode, where multiple tables can share the same connection. Only available when writing with STORAGE_API_AT_LEAST_ONCE" + + " mode. For more information, see https://cloud.google.com/bigquery/docs/write-api-best-practices#connection_pool_management") @Default.Boolean(false) Boolean getUseStorageApiConnectionPool(); From 70b135b7e16d302b79a0ec4585025151e482b5f2 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 28 Jun 2024 19:49:38 -0400 Subject: [PATCH 3/8] add to CHANGES.md --- CHANGES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 0ee9f656a180..6f033ed66839 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -67,6 +67,7 @@ ## New Features / Improvements * Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)). +* Added options to control the number of Storage API multiplexing connections ([#31721](https://github.com/apache/beam/pull/31721)) ## Breaking Changes @@ -78,6 +79,7 @@ ## Bugfixes +* Fixed a bug in BigQuery's batch Storage Write API connector that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710)) * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Security Fixes From c224392ae3efc11b37a4f630f38f11b65b35c297 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 28 Jun 2024 19:59:35 -0400 Subject: [PATCH 4/8] whitespace --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 6f033ed66839..129048cf6a2f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -67,7 +67,7 @@ ## New Features / Improvements * Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)). -* Added options to control the number of Storage API multiplexing connections ([#31721](https://github.com/apache/beam/pull/31721)) +* Added options to control the number of Storage API multiplexing connections ([#31721](https://github.com/apache/beam/pull/31721)) ## Breaking Changes From 3801412eb92ca8b2f6fe50ef2d6076e55a20a1ad Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sat, 29 Jun 2024 17:36:30 -0400 Subject: [PATCH 5/8] doc --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 129048cf6a2f..0cff95cec5a6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -79,7 +79,7 @@ ## Bugfixes -* Fixed a bug in BigQuery's batch Storage Write API connector that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710)) +* Fixed a bug in BigQueryIO batch Storage Write API that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710)) * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Security Fixes From 7605622c9068c3e9de96660a1d0d4aa453731c4a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 4 Jul 2024 12:45:30 -0400 Subject: [PATCH 6/8] clarify documentation and address comments --- .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index 6584e9dfd565..41ef67139ce9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -113,7 +113,9 @@ public interface BigQueryOptions "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing (ie. useStorageApiConnectionPool=true), " + "this option sets the minimum number of connections each pool creates. This is on a per worker, per region basis. " + "Note that in practice, the minimum number of connections created is the minimum of this value and " - + "(numStorageWriteApiStreamAppendClients x num destinations).") + + "(numStorageWriteApiStreamAppendClients x num destinations). BigQuery will create this many connections at first " + + "and will only create more connections if the current ones are \"overwhelmed\". Consider increasing this value if " + + "you are running into performance issues.") @Default.Integer(2) Integer getMinConnectionPoolConnections(); @@ -122,8 +124,9 @@ public interface BigQueryOptions @Description( "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing (ie. useStorageApiConnectionPool=true), " + "this option sets the maximum number of connections each pool creates. This is on a per worker, per region basis. " - + "This value should be greater than or equal to the total number of dynamic destinations, otherwise a " - + "race condition occurs where append operations compete over streams.") + + "This value should be greater than or equal to the total number of dynamic destinations, otherwise performance issues " + + "may occur due to race conditions where append operations compete over limited number of streams. If this occurs, " + + "individual append operations will intermittently fail but will be retried.") @Default.Integer(20) Integer getMaxConnectionPoolConnections(); @@ -144,7 +147,9 @@ public interface BigQueryOptions @Description( "Enables multiplexing mode, where multiple tables can share the same connection. Only available when writing with STORAGE_API_AT_LEAST_ONCE" - + " mode. For more information, see https://cloud.google.com/bigquery/docs/write-api-best-practices#connection_pool_management") + + " mode. This is recommended if your write operation is creating 20+ connections. When using multiplexing, consider tuning " + + "the number of connections created by the connection pool with minConnectionPoolConnections and maxConnectionPoolConnections. " + + "For more information, see https://cloud.google.com/bigquery/docs/write-api-best-practices#connection_pool_management") @Default.Boolean(false) Boolean getUseStorageApiConnectionPool(); From 7951755b2acc9c80ddd97130aae42131868d73a4 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 11 Jul 2024 13:27:57 -0400 Subject: [PATCH 7/8] adjust description --- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index 41ef67139ce9..79eb62d29982 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -124,9 +124,8 @@ public interface BigQueryOptions @Description( "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing (ie. useStorageApiConnectionPool=true), " + "this option sets the maximum number of connections each pool creates. This is on a per worker, per region basis. " - + "This value should be greater than or equal to the total number of dynamic destinations, otherwise performance issues " - + "may occur due to race conditions where append operations compete over limited number of streams. If this occurs, " - + "individual append operations will intermittently fail but will be retried.") + + "If writing to many dynamic destinations (>20) and experiencing performance issues or seeing append operations competing" + + "for streams, consider increasing this value.") @Default.Integer(20) Integer getMaxConnectionPoolConnections(); From 5e07a0306bf513d33b27da0000c421be32e5c51f Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 11 Jul 2024 16:15:33 -0400 Subject: [PATCH 8/8] add details --- .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index 79eb62d29982..ba76f483f774 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -111,11 +111,11 @@ public interface BigQueryOptions @Description( "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing (ie. useStorageApiConnectionPool=true), " - + "this option sets the minimum number of connections each pool creates. This is on a per worker, per region basis. " - + "Note that in practice, the minimum number of connections created is the minimum of this value and " - + "(numStorageWriteApiStreamAppendClients x num destinations). BigQuery will create this many connections at first " - + "and will only create more connections if the current ones are \"overwhelmed\". Consider increasing this value if " - + "you are running into performance issues.") + + "this option sets the minimum number of connections each pool creates before any connections are shared. This is " + + "on a per worker, per region basis. Note that in practice, the minimum number of connections created is the minimum " + + "of this value and (numStorageWriteApiStreamAppendClients x num destinations). BigQuery will create this many " + + "connections at first and will only create more connections if the current ones are \"overwhelmed\". Consider " + + "increasing this value if you are running into performance issues.") @Default.Integer(2) Integer getMinConnectionPoolConnections();