Skip to content

Commit

Permalink
feat: update tracers to use built in metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Jun 8, 2022
1 parent f4a7bfb commit 38b2804
Show file tree
Hide file tree
Showing 12 changed files with 780 additions and 27 deletions.
9 changes: 9 additions & 0 deletions google-cloud-bigtable/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,22 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable-stats</artifactId>
<version>2.6.3-SNAPSHOT</version><!-- {x-version-update:google-cloud-bigtable:current} -->
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- NOTE: Dependencies are organized into two groups, production and test.
Within a group, dependencies are sorted by (groupId, artifactId) -->

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable-stats</artifactId>
</dependency>
<!-- Production dependencies -->
<dependency>
<groupId>com.google.api</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
Expand All @@ -89,6 +90,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -152,6 +154,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
EnhancedBigtableStubSettings settings, Tagger tagger, StatsRecorder stats)
throws IOException {
EnhancedBigtableStubSettings.Builder builder = settings.toBuilder();
StatsWrapper statsWrapper = StatsWrapper.get();

// TODO: this implementation is on the cusp of unwieldy, if we end up adding more features
// consider splitting it up by feature.
Expand Down Expand Up @@ -194,6 +197,12 @@ public static EnhancedBigtableStubSettings finalizeSettings(
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
TagValue.create(settings.getAppProfileId()))
.build();
ImmutableMap<String, String> builtinAttributes =
ImmutableMap.<String, String>builder()
.put("project_id", settings.getProjectId())
.put("instance_id", settings.getInstanceId())
.put("app_profile", settings.getAppProfileId())
.build();
// Inject Opencensus instrumentation
builder.setTracerFactory(
new CompositeTracerFactory(
Expand All @@ -218,6 +227,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build()),
// Add OpenCensus Metrics
MetricsTracerFactory.create(tagger, stats, attributes),
BuiltinMetricsTracerFactory.create(statsWrapper, builtinAttributes),
// Add user configured tracer
settings.getTracerFactory())));
return builder.build();
Expand Down Expand Up @@ -466,7 +476,7 @@ private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
new TracedBatcherUnaryCallable<>(readRowsUserCallable.all());

UnaryCallable<Query, List<RowT>> withHeaderTracer =
new HeaderTracerUnaryCallable(tracedBatcher);
new HeaderTracerUnaryCallable<>(tracedBatcher);

UnaryCallable<Query, List<RowT>> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), span);
Expand Down Expand Up @@ -594,11 +604,11 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {

SpanName spanName = getSpanName("MutateRows");

UnaryCallable<BulkMutation, Void> tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing);
UnaryCallable<BulkMutation, Void> tracedBatcherUnaryCallable =
new TracedBatcherUnaryCallable<>(userFacing);

UnaryCallable<BulkMutation, Void> withHeaderTracer =
new HeaderTracerUnaryCallable<>(tracedBatcher);

new HeaderTracerUnaryCallable<>(tracedBatcherUnaryCallable);
UnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), spanName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base
* implementation that does nothing.
*/
@BetaApi("This surface is stable yet it might be removed in the future.")
@BetaApi("This surface is not stable yet it might be removed in the future.")
public class BigtableTracer extends BaseApiTracer {

private volatile int attempt = 0;
Expand All @@ -35,6 +35,11 @@ public void attemptStarted(int attemptNumber) {
this.attempt = attemptNumber;
}

/** annotate when onRequest is called */
public void onRequest() {
// noop
}

/**
* Get the attempt number of the current call. Attempt number for the current call is passed in
* and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from
Expand All @@ -57,4 +62,9 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
public void batchRequestThrottled(long throttledTimeMs) {
// noop
}

/** Set the Bigtable zone and cluster so metrics can be tagged with location information. */
public void setLocations(String zone, String cluster) {
// noop
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.api.gax.tracing.SpanName;
import com.google.cloud.bigtable.stats.StatsRecorderWrapper;
import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.common.base.Stopwatch;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* A {@link BigtableTracer} that records built-in metrics and publish under the
* bigtable.googleapis.com/client namespace
*/
class BuiltinMetricsTracer extends BigtableTracer {

private StatsRecorderWrapper recorder;

private final ApiTracerFactory.OperationType operationType;
private final SpanName spanName;

// Operation level metrics
private final AtomicBoolean opFinished = new AtomicBoolean();
private final Stopwatch operationTimer = Stopwatch.createStarted();
private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted();

// Attempt level metrics
private int attemptCount = 0;
private Stopwatch attemptTimer;
private volatile int attempt = 0;

// Total application latency
private final Stopwatch applicationLatencyTimer = Stopwatch.createUnstarted();
private final AtomicLong totalApplicationLatency = new AtomicLong(0);

// Monitored resource labels
private String tableId = "undefined";
private String zone = "undefined";
private String cluster = "undefined";

// gfe stats
private AtomicLong gfeMissingHeaders = new AtomicLong(0);

BuiltinMetricsTracer(
ApiTracerFactory.OperationType operationType,
SpanName spanName,
Map<String, String> attributes,
StatsWrapper statsWrapper,
StatsRecorderWrapper recorder) {
this.operationType = operationType;
this.spanName = spanName;
if (recorder != null) {
this.recorder = recorder;
} else {
this.recorder = new StatsRecorderWrapper(operationType, spanName, attributes, statsWrapper);
}
}

@Override
public Scope inScope() {
return new Scope() {
@Override
public void close() {}
};
}

@Override
public void operationSucceeded() {
recordOperationCompletion(null);
}

@Override
public void operationCancelled() {
recordOperationCompletion(new CancellationException());
}

@Override
public void operationFailed(Throwable error) {
recordOperationCompletion(error);
}

@Override
public void attemptStarted(int attemptNumber) {
attemptStarted(null, attemptNumber);
}

@Override
public void attemptStarted(Object request, int attemptNumber) {
this.attempt = attemptNumber;
attemptCount++;
attemptTimer = Stopwatch.createStarted();
if (request != null) {
this.tableId = Util.extractTableId(request);
}
if (applicationLatencyTimer.isRunning()) {
totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
applicationLatencyTimer.reset();
}
}

@Override
public void attemptSucceeded() {
recordAttemptCompletion(null);
}

@Override
public void attemptCancelled() {
recordAttemptCompletion(new CancellationException());
}

@Override
public void attemptFailed(Throwable error, Duration delay) {
if (!applicationLatencyTimer.isRunning()) {
applicationLatencyTimer.start();
}
recordAttemptCompletion(error);
}

@Override
public void attemptFailedRetriesExhausted(Throwable error) {
super.attemptFailedRetriesExhausted(error);
}

@Override
public void attemptPermanentFailure(Throwable error) {
super.attemptPermanentFailure(error);
}

@Override
public void lroStartFailed(Throwable error) {
super.lroStartFailed(error);
}

@Override
public void lroStartSucceeded() {
super.lroStartSucceeded();
}

@Override
public void onRequest() {
if (applicationLatencyTimer.isRunning()) {
totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
applicationLatencyTimer.reset();
}
}

@Override
public void responseReceived() {
if (!applicationLatencyTimer.isRunning()) {
applicationLatencyTimer.start();
}
if (firstResponsePerOpTimer.isRunning()) {
firstResponsePerOpTimer.stop();
}
}

@Override
public void requestSent() {
super.requestSent();
}

@Override
public void batchRequestSent(long elementCount, long requestSize) {
super.batchRequestSent(elementCount, requestSize);
}

@Override
public int getAttempt() {
return attempt;
}

@Override
public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) {
// Record the metrics and put in the map after the attempt is done, so we can have cluster and
// zone information
if (latency != null) {
recorder.putGfeLatencies(latency);
} else {
gfeMissingHeaders.incrementAndGet();
}
recorder.putGfeMissingHeaders(gfeMissingHeaders.get());
}

@Override
public void setLocations(String zone, String cluster) {
this.zone = zone;
this.cluster = cluster;
}

@Override
public void batchRequestThrottled(long throttledTimeMs) {
recorder.putBatchRequestThrottled(throttledTimeMs);
}

private void recordOperationCompletion(@Nullable Throwable status) {
if (!opFinished.compareAndSet(false, true)) {
return;
}
operationTimer.stop();

recorder.putRetryCount(attemptCount);

if (applicationLatencyTimer.isRunning()) {
applicationLatencyTimer.stop();
totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
}
recorder.putApplicationLatencies(totalApplicationLatency.get());

recorder.putOperationLatencies(operationTimer.elapsed(TimeUnit.MILLISECONDS));

if (operationType == ApiTracerFactory.OperationType.ServerStreaming
&& spanName.getMethodName().equals("ReadRows")) {
recorder.putFirstResponseLatencies(firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS));
}

recorder.record(Util.extractStatus(status), tableId, zone, cluster);
}

private void recordAttemptCompletion(@Nullable Throwable status) {
recorder.putAttemptLatencies(attemptTimer.elapsed(TimeUnit.MILLISECONDS));

recorder.record(Util.extractStatus(status), tableId, zone, cluster);
}
}
Loading

0 comments on commit 38b2804

Please sign in to comment.