Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Client change for using request priority to enable flow control #1900

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -728,19 +728,20 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
.build(),
settings.bulkMutateRowsSettings().getRetryableCodes());

ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable =
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> statsHeader =
new StatsHeadersServerStreamingCallable<>(base);

if (settings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) {
callable = new RateLimitingServerStreamingCallable(callable);
}
// Always create this callable because flow control will be enabled by the presence of
// RateLimitInfo, not the client flag
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> flowControl =
new RateLimitingServerStreamingCallable(statsHeader);

// Sometimes MutateRows connections are disconnected via an RST frame. This error is transient
// and
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> convertException =
new ConvertExceptionCallable<>(callable);
new ConvertExceptionCallable<>(flowControl);

ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(convertException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,7 @@ public EnhancedBigtableStubSettings build() {
this.setTransportChannelProvider(channelProviderBuilder.build());
}

// Will be deprecated once we migrate flow control user to use request priority
if (this.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) {
// only set mutate rows feature flag when this feature is enabled
featureFlags.setMutateRowsRateLimit(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -64,6 +65,10 @@ class RateLimitingServerStreamingCallable
// as the server side cap
private static final double MAX_FACTOR = 1.3;

// Disabled by default, enabled if RateLimitInfo is present, which is set on server side when
// feature flag is present or low request priority is used.
private final AtomicBoolean rateLimitEnabled = new AtomicBoolean(false);

private final RateLimiter limiter;

private final AtomicReference<Instant> lastQpsChangeTime = new AtomicReference<>(Instant.now());
Expand All @@ -73,40 +78,32 @@ class RateLimitingServerStreamingCallable
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable) {
this.limiter = RateLimiter.create(DEFAULT_QPS);
this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set");
logger.info("Rate limiting is enabled with initial QPS of " + limiter.getRate());
}

@Override
public void call(
MutateRowsRequest request,
ResponseObserver<MutateRowsResponse> responseObserver,
ApiCallContext context) {
Stopwatch stopwatch = Stopwatch.createStarted();
limiter.acquire();
stopwatch.stop();
if (context.getTracer() instanceof BigtableTracer) {
((BigtableTracer) context.getTracer())
.batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS));
if (rateLimitEnabled.get()) {
Stopwatch stopwatch = Stopwatch.createStarted();
limiter.acquire();
stopwatch.stop();
if (context.getTracer() instanceof BigtableTracer) {
((BigtableTracer) context.getTracer())
.batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
}
RateLimitingResponseObserver innerObserver =
new RateLimitingResponseObserver(limiter, lastQpsChangeTime, responseObserver);
RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(responseObserver);
innerCallable.call(request, innerObserver, context);
}

class RateLimitingResponseObserver extends SafeResponseObserver<MutateRowsResponse> {
private final ResponseObserver<MutateRowsResponse> outerObserver;
private final RateLimiter rateLimiter;

private final AtomicReference<Instant> lastQpsChangeTime;

RateLimitingResponseObserver(
RateLimiter rateLimiter,
AtomicReference<Instant> lastQpsChangeTime,
ResponseObserver<MutateRowsResponse> observer) {
RateLimitingResponseObserver(ResponseObserver<MutateRowsResponse> observer) {
super(observer);
this.outerObserver = observer;
this.rateLimiter = rateLimiter;
this.lastQpsChangeTime = lastQpsChangeTime;
}

@Override
Expand All @@ -116,7 +113,13 @@ protected void onStartImpl(StreamController controller) {

@Override
protected void onResponseImpl(MutateRowsResponse response) {
// Must not limit rate if RateLimitInfo is not present.
// Must limit rate and update QPS if RateLimitInfo is present, regardless of client side flag
// setting.
if (response.hasRateLimitInfo()) {
if (!rateLimitEnabled.getAndSet(true)) {
logger.info("Rate limiting is enabled with QPS of " + limiter.getRate());
}
RateLimitInfo info = response.getRateLimitInfo();
// RateLimitInfo is an optional field. However, proto3 sub-message field always
// have presence even thought it's marked as "optional". Check the factor and
Expand All @@ -126,7 +129,14 @@ protected void onResponseImpl(MutateRowsResponse response) {
info.getFactor(),
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
}
} else {
// Disable in case customer switched from low to higher priorities.
if (rateLimitEnabled.getAndSet(false)) {
logger.info("Rate limiting is disabled");
}
}

outerObserver.onResponse(response);
}

@Override
Expand Down
Loading