Skip to content

Commit

Permalink
Client change for using request priority to enable flow control
Browse files Browse the repository at this point in the history
In this stage, client needs to do 3 things:
1. Always create flow control callable regardless of client flag
2. Make sure the entire callable behave like no-op if RateLimitInfo is not present.
3. Make sure client runs flow control as long as RateLimitInfo is present, regardless of client flag.

Meanwhile, setting client flag would still set the feature flag.

On server side, we'll compute and return RateLimitInfo if AFE sees priority is low.
  • Loading branch information
kongweihan committed Sep 7, 2023
1 parent 7cc8a28 commit ec8d677
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -728,19 +728,19 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
.build(),
settings.bulkMutateRowsSettings().getRetryableCodes());

ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable =
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> statsHeader =
new StatsHeadersServerStreamingCallable<>(base);

if (settings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) {
callable = new RateLimitingServerStreamingCallable(callable);
}
// Always create this callable because flow control will be enabled by the presence of RateLimitInfo, not the client flag
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> flowControl = new RateLimitingServerStreamingCallable(
statsHeader);

// Sometimes MutateRows connections are disconnected via an RST frame. This error is transient
// and
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> convertException =
new ConvertExceptionCallable<>(callable);
new ConvertExceptionCallable<>(flowControl);

ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(convertException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,7 @@ public EnhancedBigtableStubSettings build() {
this.setTransportChannelProvider(channelProviderBuilder.build());
}

// Will be deprecated once we migrate flow control user to use request priority
if (this.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) {
// only set mutate rows feature flag when this feature is enabled
featureFlags.setMutateRowsRateLimit(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -64,6 +65,8 @@ class RateLimitingServerStreamingCallable
// as the server side cap
private static final double MAX_FACTOR = 1.3;

private final AtomicBoolean rateLimitEnabled = new AtomicBoolean(true);

private final RateLimiter limiter;

private final AtomicReference<Instant> lastQpsChangeTime = new AtomicReference<>(Instant.now());
Expand All @@ -81,12 +84,14 @@ public void call(
MutateRowsRequest request,
ResponseObserver<MutateRowsResponse> responseObserver,
ApiCallContext context) {
Stopwatch stopwatch = Stopwatch.createStarted();
limiter.acquire();
stopwatch.stop();
if (context.getTracer() instanceof BigtableTracer) {
((BigtableTracer) context.getTracer())
.batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS));
if (rateLimitEnabled.get()) {
Stopwatch stopwatch = Stopwatch.createStarted();
limiter.acquire();
stopwatch.stop();
if (context.getTracer() instanceof BigtableTracer) {
((BigtableTracer) context.getTracer())
.batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
}
RateLimitingResponseObserver innerObserver =
new RateLimitingResponseObserver(limiter, lastQpsChangeTime, responseObserver);
Expand Down Expand Up @@ -116,7 +121,10 @@ protected void onStartImpl(StreamController controller) {

@Override
protected void onResponseImpl(MutateRowsResponse response) {
// Must not limit rate if RateLimitInfo is not present
// Must limit rate and update QPS if RateLimitInfo is present, regardless of client side flag setting
if (response.hasRateLimitInfo()) {
rateLimitEnabled.set(true);
RateLimitInfo info = response.getRateLimitInfo();
// RateLimitInfo is an optional field. However, proto3 sub-message field always
// have presence even thought it's marked as "optional". Check the factor and
Expand All @@ -126,6 +134,8 @@ protected void onResponseImpl(MutateRowsResponse response) {
info.getFactor(),
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
}
} else {
rateLimitEnabled.set(false);
}
}

Expand Down

0 comments on commit ec8d677

Please sign in to comment.