diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 474c140392..86eaaa9579 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -728,19 +728,20 @@ public Map extract(MutateRowsRequest mutateRowsRequest) { .build(), settings.bulkMutateRowsSettings().getRetryableCodes()); - ServerStreamingCallable callable = + ServerStreamingCallable statsHeader = new StatsHeadersServerStreamingCallable<>(base); - if (settings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) { - callable = new RateLimitingServerStreamingCallable(callable); - } + // Always create this callable because flow control will be enabled by the presence of + // RateLimitInfo, not the client flag + ServerStreamingCallable flowControl = + new RateLimitingServerStreamingCallable(statsHeader); // Sometimes MutateRows connections are disconnected via an RST frame. This error is transient // and // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code // which by default is not retryable. Convert the exception so it can be retried in the client. ServerStreamingCallable convertException = - new ConvertExceptionCallable<>(callable); + new ConvertExceptionCallable<>(flowControl); ServerStreamingCallable withBigtableTracer = new BigtableTracerStreamingCallable<>(convertException); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index fd54daa0d5..6951b5ab4d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -975,6 +975,7 @@ public EnhancedBigtableStubSettings build() { this.setTransportChannelProvider(channelProviderBuilder.build()); } + // Will be deprecated once we migrate flow control user to use request priority if (this.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) { // only set mutate rows feature flag when this feature is enabled featureFlags.setMutateRowsRateLimit(true); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java index 278019b07e..a30508b436 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java @@ -31,6 +31,7 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -64,6 +65,10 @@ class RateLimitingServerStreamingCallable // as the server side cap private static final double MAX_FACTOR = 1.3; + // Disabled by default, enabled if RateLimitInfo is present, which is set on server side when + // feature flag is present or low request priority is used. + private final AtomicBoolean rateLimitEnabled = new AtomicBoolean(false); + private final RateLimiter limiter; private final AtomicReference lastQpsChangeTime = new AtomicReference<>(Instant.now()); @@ -73,7 +78,6 @@ class RateLimitingServerStreamingCallable @Nonnull ServerStreamingCallable innerCallable) { this.limiter = RateLimiter.create(DEFAULT_QPS); this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set"); - logger.info("Rate limiting is enabled with initial QPS of " + limiter.getRate()); } @Override @@ -81,32 +85,25 @@ public void call( MutateRowsRequest request, ResponseObserver responseObserver, ApiCallContext context) { - Stopwatch stopwatch = Stopwatch.createStarted(); - limiter.acquire(); - stopwatch.stop(); - if (context.getTracer() instanceof BigtableTracer) { - ((BigtableTracer) context.getTracer()) - .batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + if (rateLimitEnabled.get()) { + Stopwatch stopwatch = Stopwatch.createStarted(); + limiter.acquire(); + stopwatch.stop(); + if (context.getTracer() instanceof BigtableTracer) { + ((BigtableTracer) context.getTracer()) + .batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } } - RateLimitingResponseObserver innerObserver = - new RateLimitingResponseObserver(limiter, lastQpsChangeTime, responseObserver); + RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(responseObserver); innerCallable.call(request, innerObserver, context); } class RateLimitingResponseObserver extends SafeResponseObserver { private final ResponseObserver outerObserver; - private final RateLimiter rateLimiter; - - private final AtomicReference lastQpsChangeTime; - RateLimitingResponseObserver( - RateLimiter rateLimiter, - AtomicReference lastQpsChangeTime, - ResponseObserver observer) { + RateLimitingResponseObserver(ResponseObserver observer) { super(observer); this.outerObserver = observer; - this.rateLimiter = rateLimiter; - this.lastQpsChangeTime = lastQpsChangeTime; } @Override @@ -116,7 +113,13 @@ protected void onStartImpl(StreamController controller) { @Override protected void onResponseImpl(MutateRowsResponse response) { + // Must not limit rate if RateLimitInfo is not present. + // Must limit rate and update QPS if RateLimitInfo is present, regardless of client side flag + // setting. if (response.hasRateLimitInfo()) { + if (!rateLimitEnabled.getAndSet(true)) { + logger.info("Rate limiting is enabled with QPS of " + limiter.getRate()); + } RateLimitInfo info = response.getRateLimitInfo(); // RateLimitInfo is an optional field. However, proto3 sub-message field always // have presence even thought it's marked as "optional". Check the factor and @@ -126,7 +129,14 @@ protected void onResponseImpl(MutateRowsResponse response) { info.getFactor(), Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod()))); } + } else { + // Disable in case customer switched from low to higher priorities. + if (rateLimitEnabled.getAndSet(false)) { + logger.info("Rate limiting is disabled"); + } } + + outerObserver.onResponse(response); } @Override