From eec9bd96539f4c278eecc9d58cab86740acd4a53 Mon Sep 17 00:00:00 2001 From: Weihan Kong Date: Thu, 7 Sep 2023 17:36:27 -0400 Subject: [PATCH] Client change for using request priority to enable flow control In this stage, client needs to do 3 things: 1. Always create flow control callable regardless of client flag 2. Make sure the entire callable behave like no-op if RateLimitInfo is not present. 3. Make sure client runs flow control as long as RateLimitInfo is present, regardless of client flag. Meanwhile, setting client flag would still set the feature flag. On server side, we'll compute and return RateLimitInfo if AFE sees priority is low. --- .../data/v2/stub/EnhancedBigtableStub.java | 10 ++++----- .../v2/stub/EnhancedBigtableStubSettings.java | 1 + .../RateLimitingServerStreamingCallable.java | 22 ++++++++++++++----- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 474c140392..81f747adf6 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -728,19 +728,19 @@ public Map extract(MutateRowsRequest mutateRowsRequest) { .build(), settings.bulkMutateRowsSettings().getRetryableCodes()); - ServerStreamingCallable callable = + ServerStreamingCallable statsHeader = new StatsHeadersServerStreamingCallable<>(base); - if (settings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) { - callable = new RateLimitingServerStreamingCallable(callable); - } + // Always create this callable because flow control will be enabled by the presence of RateLimitInfo, not the client flag + ServerStreamingCallable flowControl = new RateLimitingServerStreamingCallable( + statsHeader); // Sometimes MutateRows connections are disconnected via an RST frame. This error is transient // and // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code // which by default is not retryable. Convert the exception so it can be retried in the client. ServerStreamingCallable convertException = - new ConvertExceptionCallable<>(callable); + new ConvertExceptionCallable<>(flowControl); ServerStreamingCallable withBigtableTracer = new BigtableTracerStreamingCallable<>(convertException); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index fd54daa0d5..6951b5ab4d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -975,6 +975,7 @@ public EnhancedBigtableStubSettings build() { this.setTransportChannelProvider(channelProviderBuilder.build()); } + // Will be deprecated once we migrate flow control user to use request priority if (this.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) { // only set mutate rows feature flag when this feature is enabled featureFlags.setMutateRowsRateLimit(true); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java index 278019b07e..450881b7a7 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java @@ -31,6 +31,7 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -64,6 +65,8 @@ class RateLimitingServerStreamingCallable // as the server side cap private static final double MAX_FACTOR = 1.3; + private final AtomicBoolean rateLimitEnabled = new AtomicBoolean(true); + private final RateLimiter limiter; private final AtomicReference lastQpsChangeTime = new AtomicReference<>(Instant.now()); @@ -81,12 +84,14 @@ public void call( MutateRowsRequest request, ResponseObserver responseObserver, ApiCallContext context) { - Stopwatch stopwatch = Stopwatch.createStarted(); - limiter.acquire(); - stopwatch.stop(); - if (context.getTracer() instanceof BigtableTracer) { - ((BigtableTracer) context.getTracer()) - .batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + if (rateLimitEnabled.get()) { + Stopwatch stopwatch = Stopwatch.createStarted(); + limiter.acquire(); + stopwatch.stop(); + if (context.getTracer() instanceof BigtableTracer) { + ((BigtableTracer) context.getTracer()) + .batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } } RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(limiter, lastQpsChangeTime, responseObserver); @@ -116,7 +121,10 @@ protected void onStartImpl(StreamController controller) { @Override protected void onResponseImpl(MutateRowsResponse response) { + // No-op if RateLimitInfo is not present + // Update QPS if RateLimitInfo is present, regardless of client side flag setting if (response.hasRateLimitInfo()) { + rateLimitEnabled.set(true); RateLimitInfo info = response.getRateLimitInfo(); // RateLimitInfo is an optional field. However, proto3 sub-message field always // have presence even thought it's marked as "optional". Check the factor and @@ -126,6 +134,8 @@ protected void onResponseImpl(MutateRowsResponse response) { info.getFactor(), Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod()))); } + } else { + rateLimitEnabled.set(false); } }