From 761bab7613ccc0e48dc8742366a01342b8756946 Mon Sep 17 00:00:00 2001 From: Weihan Kong Date: Thu, 7 Sep 2023 17:36:27 -0400 Subject: [PATCH 1/3] Client change for using request priority to enable flow control In this stage, client needs to do 3 things: 1. Always create flow control callable regardless of client flag 2. Make sure the entire callable behave like no-op if RateLimitInfo is not present. 3. Make sure client runs flow control as long as RateLimitInfo is present, regardless of client flag. Meanwhile, setting client flag would still set the feature flag. On server side, we'll compute and return RateLimitInfo if server side sees priority is low. --- .../data/v2/stub/EnhancedBigtableStub.java | 11 ++--- .../v2/stub/EnhancedBigtableStubSettings.java | 1 + .../RateLimitingServerStreamingCallable.java | 45 +++++++++++-------- 3 files changed, 34 insertions(+), 23 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 474c140392..86eaaa9579 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -728,19 +728,20 @@ public Map extract(MutateRowsRequest mutateRowsRequest) { .build(), settings.bulkMutateRowsSettings().getRetryableCodes()); - ServerStreamingCallable callable = + ServerStreamingCallable statsHeader = new StatsHeadersServerStreamingCallable<>(base); - if (settings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) { - callable = new RateLimitingServerStreamingCallable(callable); - } + // Always create this callable because flow control will be enabled by the presence of + // RateLimitInfo, not the client flag + ServerStreamingCallable flowControl = + new RateLimitingServerStreamingCallable(statsHeader); // Sometimes MutateRows connections are disconnected via an RST frame. This error is transient // and // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code // which by default is not retryable. Convert the exception so it can be retried in the client. ServerStreamingCallable convertException = - new ConvertExceptionCallable<>(callable); + new ConvertExceptionCallable<>(flowControl); ServerStreamingCallable withBigtableTracer = new BigtableTracerStreamingCallable<>(convertException); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index fd54daa0d5..6951b5ab4d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -975,6 +975,7 @@ public EnhancedBigtableStubSettings build() { this.setTransportChannelProvider(channelProviderBuilder.build()); } + // Will be deprecated once we migrate flow control user to use request priority if (this.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) { // only set mutate rows feature flag when this feature is enabled featureFlags.setMutateRowsRateLimit(true); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java index 278019b07e..7ffef55b70 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java @@ -31,6 +31,7 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -64,6 +65,10 @@ class RateLimitingServerStreamingCallable // as the server side cap private static final double MAX_FACTOR = 1.3; + // Disabled by default, enabled if RateLimitInfo is present, which is set on server side when + // feature flag is present or low request priority is used. + private final AtomicBoolean rateLimitEnabled = new AtomicBoolean(false); + private final RateLimiter limiter; private final AtomicReference lastQpsChangeTime = new AtomicReference<>(Instant.now()); @@ -73,7 +78,7 @@ class RateLimitingServerStreamingCallable @Nonnull ServerStreamingCallable innerCallable) { this.limiter = RateLimiter.create(DEFAULT_QPS); this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set"); - logger.info("Rate limiting is enabled with initial QPS of " + limiter.getRate()); + logger.info("Rate limiting callable is created with initial QPS of " + limiter.getRate()); } @Override @@ -81,32 +86,25 @@ public void call( MutateRowsRequest request, ResponseObserver responseObserver, ApiCallContext context) { - Stopwatch stopwatch = Stopwatch.createStarted(); - limiter.acquire(); - stopwatch.stop(); - if (context.getTracer() instanceof BigtableTracer) { - ((BigtableTracer) context.getTracer()) - .batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + if (rateLimitEnabled.get()) { + Stopwatch stopwatch = Stopwatch.createStarted(); + limiter.acquire(); + stopwatch.stop(); + if (context.getTracer() instanceof BigtableTracer) { + ((BigtableTracer) context.getTracer()) + .batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } } - RateLimitingResponseObserver innerObserver = - new RateLimitingResponseObserver(limiter, lastQpsChangeTime, responseObserver); + RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(responseObserver); innerCallable.call(request, innerObserver, context); } class RateLimitingResponseObserver extends SafeResponseObserver { private final ResponseObserver outerObserver; - private final RateLimiter rateLimiter; - - private final AtomicReference lastQpsChangeTime; - RateLimitingResponseObserver( - RateLimiter rateLimiter, - AtomicReference lastQpsChangeTime, - ResponseObserver observer) { + RateLimitingResponseObserver(ResponseObserver observer) { super(observer); this.outerObserver = observer; - this.rateLimiter = rateLimiter; - this.lastQpsChangeTime = lastQpsChangeTime; } @Override @@ -116,7 +114,13 @@ protected void onStartImpl(StreamController controller) { @Override protected void onResponseImpl(MutateRowsResponse response) { + // Must not limit rate if RateLimitInfo is not present. + // Must limit rate and update QPS if RateLimitInfo is present, regardless of client side flag + // setting. if (response.hasRateLimitInfo()) { + if (!rateLimitEnabled.getAndSet(true)) { + logger.info("Rate limiting is enabled with QPS of " + limiter.getRate()); + } RateLimitInfo info = response.getRateLimitInfo(); // RateLimitInfo is an optional field. However, proto3 sub-message field always // have presence even thought it's marked as "optional". Check the factor and @@ -126,6 +130,11 @@ protected void onResponseImpl(MutateRowsResponse response) { info.getFactor(), Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod()))); } + } else { + // Disable in case customer switched from low to higher priorities. + if (rateLimitEnabled.getAndSet(false)) { + logger.info("Rate limiting is disabled"); + } } } From 5f4f32f4b25f618bcedc63b68569edeac2ff21b9 Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Mon, 11 Sep 2023 15:31:54 -0400 Subject: [PATCH 2/3] remove log noise Change-Id: I721ad64ce49b0310eb44d86e3b171ab2bef46437 --- .../data/v2/stub/RateLimitingServerStreamingCallable.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java index 7ffef55b70..a9aa8f0a33 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java @@ -78,7 +78,6 @@ class RateLimitingServerStreamingCallable @Nonnull ServerStreamingCallable innerCallable) { this.limiter = RateLimiter.create(DEFAULT_QPS); this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set"); - logger.info("Rate limiting callable is created with initial QPS of " + limiter.getRate()); } @Override From d128fce9cc44772a3ac0c98937eedcf2ddeafcfc Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Mon, 11 Sep 2023 16:25:26 -0400 Subject: [PATCH 3/3] fix missing call to outer observer Change-Id: Ibda6daed72f0f267ad741836701848f33c2d5377 --- .../data/v2/stub/RateLimitingServerStreamingCallable.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java index a9aa8f0a33..a30508b436 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java @@ -135,6 +135,8 @@ protected void onResponseImpl(MutateRowsResponse response) { logger.info("Rate limiting is disabled"); } } + + outerObserver.onResponse(response); } @Override