Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update tracers to use built in metrics #1244

Merged
merged 15 commits into from
Jun 29, 2022
4 changes: 4 additions & 0 deletions google-cloud-bigtable/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
<!-- NOTE: Dependencies are organized into two groups, production and test.
Within a group, dependencies are sorted by (groupId, artifactId) -->

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable-stats</artifactId>
</dependency>
<!-- Production dependencies -->
<dependency>
<groupId>com.google.api</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
Expand Down Expand Up @@ -194,6 +195,12 @@ public static EnhancedBigtableStubSettings finalizeSettings(
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
TagValue.create(settings.getAppProfileId()))
.build();
ImmutableMap<String, String> builtinAttributes =
ImmutableMap.<String, String>builder()
.put("project_id", settings.getProjectId())
.put("instance_id", settings.getInstanceId())
.put("app_profile", settings.getAppProfileId())
.build();
// Inject Opencensus instrumentation
builder.setTracerFactory(
new CompositeTracerFactory(
Expand All @@ -218,6 +225,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build()),
// Add OpenCensus Metrics
MetricsTracerFactory.create(tagger, stats, attributes),
BuiltinMetricsTracerFactory.create(builtinAttributes),
// Add user configured tracer
settings.getTracerFactory())));
return builder.build();
Expand Down Expand Up @@ -466,7 +474,7 @@ private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
new TracedBatcherUnaryCallable<>(readRowsUserCallable.all());

UnaryCallable<Query, List<RowT>> withHeaderTracer =
new HeaderTracerUnaryCallable(tracedBatcher);
new HeaderTracerUnaryCallable<>(tracedBatcher);

UnaryCallable<Query, List<RowT>> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), span);
Expand Down Expand Up @@ -594,11 +602,11 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {

SpanName spanName = getSpanName("MutateRows");

UnaryCallable<BulkMutation, Void> tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing);
UnaryCallable<BulkMutation, Void> tracedBatcherUnaryCallable =
new TracedBatcherUnaryCallable<>(userFacing);

UnaryCallable<BulkMutation, Void> withHeaderTracer =
new HeaderTracerUnaryCallable<>(tracedBatcher);

new HeaderTracerUnaryCallable<>(tracedBatcherUnaryCallable);
UnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), spanName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base
* implementation that does nothing.
*/
@BetaApi("This surface is stable yet it might be removed in the future.")
@BetaApi("This surface is not stable yet it might be removed in the future.")
public class BigtableTracer extends BaseApiTracer {

private volatile int attempt = 0;
Expand All @@ -35,6 +35,23 @@ public void attemptStarted(int attemptNumber) {
this.attempt = attemptNumber;
}

/** annotate when onRequest is called. This will be called in BuiltinMetricsTracer. */
public void onRequest(int requestCount) {
// noop
}

/**
* annotate when automatic flow control is disabled. This will be called in BuiltinMetricsTracer.
*/
public void disableFlowControl() {
// noop
}

/** annotate after the callback from onResponse. This will be called in BuiltinMetricsTracer. */
public void afterResponse(long applicationLatency) {
// noop
}

/**
* Get the attempt number of the current call. Attempt number for the current call is passed in
* and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from
Expand All @@ -57,4 +74,12 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
public void batchRequestThrottled(long throttledTimeMs) {
// noop
}

/**
* Set the Bigtable zone and cluster so metrics can be tagged with location information. This will
* be called in BuiltinMetricsTracer.
*/
public void setLocations(String zone, String cluster) {
// noop
mutianf marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import static com.google.api.gax.tracing.ApiTracerFactory.OperationType;

import com.google.api.gax.tracing.SpanName;
import com.google.cloud.bigtable.stats.StatsRecorderWrapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.math.IntMath;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* A {@link BigtableTracer} that records built-in metrics and publish under the
* bigtable.googleapis.com/client namespace
*/
class BuiltinMetricsTracer extends BigtableTracer {

private final StatsRecorderWrapper recorder;

private final OperationType operationType;
private final SpanName spanName;

// Operation level metrics
private final AtomicBoolean opFinished = new AtomicBoolean();
private final Stopwatch operationTimer = Stopwatch.createStarted();
private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted();

// Attempt level metrics
private int attemptCount = 0;
private Stopwatch attemptTimer;
private volatile int attempt = 0;

// Total server latency needs to be atomic because it's accessed from different threads. E.g.
// request() from user thread and attempt failed from grpc thread. We're only measuring the extra
// time application spent blocking grpc buffer, which will be operationLatency - serverLatency.
private final AtomicLong totalServerLatency = new AtomicLong(0);
// Stopwatch is not thread safe so this is a workaround to check if the stopwatch changes is
// flushed to memory.
private final Stopwatch serverLatencyTimer = Stopwatch.createUnstarted();
private final AtomicBoolean serverLatencyTimerIsRunning = new AtomicBoolean();

private boolean flowControlIsDisabled = false;

private AtomicInteger requestLeft = new AtomicInteger(0);

// Monitored resource labels
private String tableId = "undefined";
private String zone = "undefined";
private String cluster = "undefined";

// gfe stats
private AtomicLong gfeMissingHeaders = new AtomicLong(0);

@VisibleForTesting
BuiltinMetricsTracer(
OperationType operationType, SpanName spanName, StatsRecorderWrapper recorder) {
this.operationType = operationType;
this.spanName = spanName;
this.recorder = recorder;
}

@Override
public Scope inScope() {
return new Scope() {
@Override
public void close() {}
};
}

@Override
public void operationSucceeded() {
recordOperationCompletion(null);
}

@Override
public void operationCancelled() {
recordOperationCompletion(new CancellationException());
}

@Override
public void operationFailed(Throwable error) {
recordOperationCompletion(error);
}

@Override
public void attemptStarted(int attemptNumber) {
attemptStarted(null, attemptNumber);
}

@Override
public void attemptStarted(Object request, int attemptNumber) {
this.attempt = attemptNumber;
attemptCount++;
attemptTimer = Stopwatch.createStarted();
if (request != null) {
this.tableId = Util.extractTableId(request);
}
if (!flowControlIsDisabled) {
if (serverLatencyTimerIsRunning.compareAndSet(false, true)) {
serverLatencyTimer.start();
}
}
}

@Override
public void attemptSucceeded() {
recordAttemptCompletion(null);
}

@Override
public void attemptCancelled() {
recordAttemptCompletion(new CancellationException());
}

@Override
public void attemptFailed(Throwable error, Duration delay) {
recordAttemptCompletion(error);
}

@Override
public void onRequest(int requestCount) {
requestLeft.accumulateAndGet(requestCount, IntMath::saturatedAdd);
if (flowControlIsDisabled) {
// On request is only called when auto flow control is disabled. When auto flow control is
// disabled, server latency is measured between onRequest and onResponse.
if (serverLatencyTimerIsRunning.compareAndSet(false, true)) {
serverLatencyTimer.start();
}
}
}

@Override
public void responseReceived() {
// When auto flow control is enabled, server latency is measured between afterResponse and
// responseReceived.
// When auto flow control is disabled, server latency is measured between onRequest and
// responseReceived.
// When auto flow control is disabled and application requested multiple responses, server
// latency is measured between afterResponse and responseReceived.
// In all the cases, we want to stop the serverLatencyTimer here.
if (serverLatencyTimerIsRunning.compareAndSet(true, false)) {
totalServerLatency.addAndGet(serverLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
serverLatencyTimer.reset();
}
}

@Override
public void afterResponse(long applicationLatency) {
if (!flowControlIsDisabled || requestLeft.decrementAndGet() > 0) {
// When auto flow control is enabled, request will never be called, so server latency is
// measured between after the last response is processed and before the next response is
// received. If flow control is disabled but requestLeft is greater than 0,
// also start the timer to count the time between afterResponse and responseReceived.
if (serverLatencyTimerIsRunning.compareAndSet(false, true)) {
serverLatencyTimer.start();
}
}
}

@Override
public int getAttempt() {
return attempt;
}

@Override
public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) {
// Record the metrics and put in the map after the attempt is done, so we can have cluster and
// zone information
if (latency != null) {
recorder.putGfeLatencies(latency);
} else {
gfeMissingHeaders.incrementAndGet();
}
recorder.putGfeMissingHeaders(gfeMissingHeaders.get());
}

@Override
public void setLocations(String zone, String cluster) {
this.zone = zone;
this.cluster = cluster;
}

@Override
public void batchRequestThrottled(long throttledTimeMs) {
recorder.putBatchRequestThrottled(throttledTimeMs);
}

@Override
public void disableFlowControl() {
flowControlIsDisabled = true;
}

private void recordOperationCompletion(@Nullable Throwable status) {
if (!opFinished.compareAndSet(false, true)) {
return;
}
operationTimer.stop();
long operationLatency = operationTimer.elapsed(TimeUnit.MILLISECONDS);

recorder.putRetryCount(attemptCount - 1);

// serverLatencyTimer should already be stopped in recordAttemptCompletion
recorder.putOperationLatencies(operationLatency);
recorder.putApplicationLatencies(operationLatency - totalServerLatency.get());

if (operationType == OperationType.ServerStreaming
&& spanName.getMethodName().equals("ReadRows")) {
recorder.putFirstResponseLatencies(firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS));
}

recorder.record(Util.extractStatus(status), tableId, zone, cluster);
}

private void recordAttemptCompletion(@Nullable Throwable status) {
// If the attempt failed, the time spent in retry should be counted in application latency.
// Stop the stopwatch and decrement requestLeft.
if (serverLatencyTimerIsRunning.compareAndSet(true, false)) {
requestLeft.decrementAndGet();
totalServerLatency.addAndGet(serverLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
serverLatencyTimer.reset();
}
recorder.putAttemptLatencies(attemptTimer.elapsed(TimeUnit.MILLISECONDS));
recorder.record(Util.extractStatus(status), tableId, zone, cluster);
}
}
Loading