diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 474c140392..81f747adf6 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -728,19 +728,19 @@ public Map extract(MutateRowsRequest mutateRowsRequest) { .build(), settings.bulkMutateRowsSettings().getRetryableCodes()); - ServerStreamingCallable callable = + ServerStreamingCallable statsHeader = new StatsHeadersServerStreamingCallable<>(base); - if (settings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) { - callable = new RateLimitingServerStreamingCallable(callable); - } + // Always create this callable because flow control will be enabled by the presence of RateLimitInfo, not the client flag + ServerStreamingCallable flowControl = new RateLimitingServerStreamingCallable( + statsHeader); // Sometimes MutateRows connections are disconnected via an RST frame. This error is transient // and // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code // which by default is not retryable. Convert the exception so it can be retried in the client. ServerStreamingCallable convertException = - new ConvertExceptionCallable<>(callable); + new ConvertExceptionCallable<>(flowControl); ServerStreamingCallable withBigtableTracer = new BigtableTracerStreamingCallable<>(convertException); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index fd54daa0d5..6951b5ab4d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -975,6 +975,7 @@ public EnhancedBigtableStubSettings build() { this.setTransportChannelProvider(channelProviderBuilder.build()); } + // Will be deprecated once we migrate flow control user to use request priority if (this.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) { // only set mutate rows feature flag when this feature is enabled featureFlags.setMutateRowsRateLimit(true); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java index 278019b07e..635782b1c2 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java @@ -31,6 +31,7 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -64,6 +65,8 @@ class RateLimitingServerStreamingCallable // as the server side cap private static final double MAX_FACTOR = 1.3; + private final AtomicBoolean rateLimitEnabled = new AtomicBoolean(true); + private final RateLimiter limiter; private final AtomicReference lastQpsChangeTime = new AtomicReference<>(Instant.now()); @@ -81,12 +84,14 @@ public void call( MutateRowsRequest request, ResponseObserver responseObserver, ApiCallContext context) { - Stopwatch stopwatch = Stopwatch.createStarted(); - limiter.acquire(); - stopwatch.stop(); - if (context.getTracer() instanceof BigtableTracer) { - ((BigtableTracer) context.getTracer()) - .batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + if (rateLimitEnabled.get()) { + Stopwatch stopwatch = Stopwatch.createStarted(); + limiter.acquire(); + stopwatch.stop(); + if (context.getTracer() instanceof BigtableTracer) { + ((BigtableTracer) context.getTracer()) + .batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } } RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(limiter, lastQpsChangeTime, responseObserver); @@ -116,7 +121,10 @@ protected void onStartImpl(StreamController controller) { @Override protected void onResponseImpl(MutateRowsResponse response) { + // Must not limit rate if RateLimitInfo is not present + // Must limit rate and update QPS if RateLimitInfo is present, regardless of client side flag setting if (response.hasRateLimitInfo()) { + rateLimitEnabled.set(true); RateLimitInfo info = response.getRateLimitInfo(); // RateLimitInfo is an optional field. However, proto3 sub-message field always // have presence even thought it's marked as "optional". Check the factor and @@ -126,6 +134,8 @@ protected void onResponseImpl(MutateRowsResponse response) { info.getFactor(), Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod()))); } + } else { + rateLimitEnabled.set(false); } }