From ec8d67736137f8c0537f6142d91c77a9e75b563d Mon Sep 17 00:00:00 2001 From: Weihan Kong Date: Thu, 7 Sep 2023 17:36:27 -0400 Subject: [PATCH] Client change for using request priority to enable flow control In this stage, client needs to do 3 things: 1. Always create flow control callable regardless of client flag 2. Make sure the entire callable behave like no-op if RateLimitInfo is not present. 3. Make sure client runs flow control as long as RateLimitInfo is present, regardless of client flag. Meanwhile, setting client flag would still set the feature flag. On server side, we'll compute and return RateLimitInfo if AFE sees priority is low. --- .../data/v2/stub/EnhancedBigtableStub.java | 10 ++++----- .../v2/stub/EnhancedBigtableStubSettings.java | 1 + .../RateLimitingServerStreamingCallable.java | 22 ++++++++++++++----- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 474c140392..81f747adf6 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -728,19 +728,19 @@ public Map extract(MutateRowsRequest mutateRowsRequest) { .build(), settings.bulkMutateRowsSettings().getRetryableCodes()); - ServerStreamingCallable callable = + ServerStreamingCallable statsHeader = new StatsHeadersServerStreamingCallable<>(base); - if (settings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) { - callable = new RateLimitingServerStreamingCallable(callable); - } + // Always create this callable because flow control will be enabled by the presence of RateLimitInfo, not the client flag + ServerStreamingCallable flowControl = new RateLimitingServerStreamingCallable( + statsHeader); // Sometimes MutateRows connections are disconnected via an RST frame. This error is transient // and // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code // which by default is not retryable. Convert the exception so it can be retried in the client. ServerStreamingCallable convertException = - new ConvertExceptionCallable<>(callable); + new ConvertExceptionCallable<>(flowControl); ServerStreamingCallable withBigtableTracer = new BigtableTracerStreamingCallable<>(convertException); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index fd54daa0d5..6951b5ab4d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -975,6 +975,7 @@ public EnhancedBigtableStubSettings build() { this.setTransportChannelProvider(channelProviderBuilder.build()); } + // Will be deprecated once we migrate flow control user to use request priority if (this.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) { // only set mutate rows feature flag when this feature is enabled featureFlags.setMutateRowsRateLimit(true); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java index 278019b07e..635782b1c2 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java @@ -31,6 +31,7 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -64,6 +65,8 @@ class RateLimitingServerStreamingCallable // as the server side cap private static final double MAX_FACTOR = 1.3; + private final AtomicBoolean rateLimitEnabled = new AtomicBoolean(true); + private final RateLimiter limiter; private final AtomicReference lastQpsChangeTime = new AtomicReference<>(Instant.now()); @@ -81,12 +84,14 @@ public void call( MutateRowsRequest request, ResponseObserver responseObserver, ApiCallContext context) { - Stopwatch stopwatch = Stopwatch.createStarted(); - limiter.acquire(); - stopwatch.stop(); - if (context.getTracer() instanceof BigtableTracer) { - ((BigtableTracer) context.getTracer()) - .batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + if (rateLimitEnabled.get()) { + Stopwatch stopwatch = Stopwatch.createStarted(); + limiter.acquire(); + stopwatch.stop(); + if (context.getTracer() instanceof BigtableTracer) { + ((BigtableTracer) context.getTracer()) + .batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } } RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(limiter, lastQpsChangeTime, responseObserver); @@ -116,7 +121,10 @@ protected void onStartImpl(StreamController controller) { @Override protected void onResponseImpl(MutateRowsResponse response) { + // Must not limit rate if RateLimitInfo is not present + // Must limit rate and update QPS if RateLimitInfo is present, regardless of client side flag setting if (response.hasRateLimitInfo()) { + rateLimitEnabled.set(true); RateLimitInfo info = response.getRateLimitInfo(); // RateLimitInfo is an optional field. However, proto3 sub-message field always // have presence even thought it's marked as "optional". Check the factor and @@ -126,6 +134,8 @@ protected void onResponseImpl(MutateRowsResponse response) { info.getFactor(), Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod()))); } + } else { + rateLimitEnabled.set(false); } }