retrying =
- Callables.retrying(withHeaderTracer, settings.readModifyWriteRowSettings(), clientContext);
+ Callables.retrying(
+ withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext);
return createUserFacingUnaryCallable(
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
index 3d7707cc4c..421c5b4724 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
@@ -25,7 +25,7 @@
* A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base
* implementation that does nothing.
*/
-@BetaApi("This surface is stable yet it might be removed in the future.")
+@BetaApi("This surface is not stable yet it might be removed in the future.")
public class BigtableTracer extends BaseApiTracer {
private volatile int attempt = 0;
@@ -35,6 +35,11 @@ public void attemptStarted(int attemptNumber) {
this.attempt = attemptNumber;
}
+ /** annotate when onRequest is called */
+ public void onRequest() {
+ // noop
+ }
+
/**
* Get the attempt number of the current call. Attempt number for the current call is passed in
* and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from
@@ -57,4 +62,9 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
public void batchRequestThrottled(long throttledTimeMs) {
// noop
}
+
+ /** Set the Bigtable zone and cluster so metrics can be tagged with location information. */
+ public void setLocations(String zone, String cluster) {
+ // noop
+ }
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java
similarity index 53%
rename from google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java
rename to google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java
index 31c5cf1960..80731a2fa0 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java
@@ -26,26 +26,28 @@
import javax.annotation.Nonnull;
/**
- * This callable will inject a {@link GrpcResponseMetadata} to access the headers and trailers
- * returned by gRPC methods upon completion. The {@link BigtableTracer} will process metrics that
- * were injected in the header/trailer and publish them to OpenCensus. If {@link
- * GrpcResponseMetadata#getMetadata()} returned null, it probably means that the request has never
- * reached GFE, and it'll increment the gfe_header_missing_counter in this case.
+ * This callable will:
*
- * If GFE metrics are not registered in {@link RpcViews}, skip injecting GrpcResponseMetadata.
- * This is for the case where direct path is enabled, all the requests won't go through GFE and
- * therefore won't have the server-timing header.
+ *
- Inject a {@link GrpcResponseMetadata} to access the headers and trailers returned by gRPC
+ * methods upon completion. The {@link BigtableTracer} will process metrics that were injected in
+ * the header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
+ * returned null, it probably means that the request has never reached GFE, and it'll increment the
+ * gfe_header_missing_counter in this case.
+ *
+ *
- Call {@link BigtableTracer#onRequest()} to record the request events in a stream.
+ *
+ *
- Get Bigtable zone and cluster information from response trailer and record in tracer.
*
*
This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
-public class HeaderTracerStreamingCallable
+public class BigtableTracerStreamingCallable
extends ServerStreamingCallable {
private final ServerStreamingCallable innerCallable;
- public HeaderTracerStreamingCallable(
+ public BigtableTracerStreamingCallable(
@Nonnull ServerStreamingCallable callable) {
this.innerCallable = Preconditions.checkNotNull(callable, "Inner callable must be set");
}
@@ -55,9 +57,9 @@ public void call(
RequestT request, ResponseObserver responseObserver, ApiCallContext context) {
final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
// tracer should always be an instance of bigtable tracer
- if (RpcViews.isGfeMetricsRegistered() && context.getTracer() instanceof BigtableTracer) {
- HeaderTracerResponseObserver innerObserver =
- new HeaderTracerResponseObserver<>(
+ if (context.getTracer() instanceof BigtableTracer) {
+ BigtableTracerResponseObserver innerObserver =
+ new BigtableTracerResponseObserver<>(
responseObserver, (BigtableTracer) context.getTracer(), responseMetadata);
innerCallable.call(request, innerObserver, responseMetadata.addHandlers(context));
} else {
@@ -65,13 +67,13 @@ public void call(
}
}
- private class HeaderTracerResponseObserver implements ResponseObserver {
+ private class BigtableTracerResponseObserver implements ResponseObserver {
private final BigtableTracer tracer;
private final ResponseObserver outerObserver;
private final GrpcResponseMetadata responseMetadata;
- HeaderTracerResponseObserver(
+ BigtableTracerResponseObserver(
ResponseObserver observer,
BigtableTracer tracer,
GrpcResponseMetadata metadata) {
@@ -82,7 +84,8 @@ private class HeaderTracerResponseObserver implements ResponseObserve
@Override
public void onStart(final StreamController controller) {
- outerObserver.onStart(controller);
+ final StreamController tracedController = new TracedStreamController(controller, tracer);
+ outerObserver.onStart(tracedController);
}
@Override
@@ -97,6 +100,16 @@ public void onError(Throwable t) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, t);
+ // try {
+ // byte[] trailers =
+ // responseMetadata
+ // .getTrailingMetadata()
+ // .get(Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER));
+ // ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
+ // tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
+ // } catch (NullPointerException | InvalidProtocolBufferException e) {
+ // }
+
outerObserver.onError(t);
}
@@ -105,7 +118,42 @@ public void onComplete() {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
+ // try {
+ // byte[] trailers =
+ // responseMetadata.getTrailingMetadata().get(Metadata.Key.of(Util.TRAILER_KEY,
+ // Metadata.BINARY_BYTE_MARSHALLER));
+ // ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
+ // tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
+ // } catch (NullPointerException | InvalidProtocolBufferException e) {
+ // }
+
outerObserver.onComplete();
}
}
+
+ private class TracedStreamController implements StreamController {
+ private final StreamController innerController;
+ private final BigtableTracer tracer;
+
+ TracedStreamController(StreamController innerController, BigtableTracer tracer) {
+ this.innerController = innerController;
+ this.tracer = tracer;
+ }
+
+ @Override
+ public void cancel() {
+ innerController.cancel();
+ }
+
+ @Override
+ public void disableAutoInboundFlowControl() {
+ innerController.disableAutoInboundFlowControl();
+ }
+
+ @Override
+ public void request(int i) {
+ tracer.onRequest();
+ innerController.request(i);
+ }
+ }
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java
similarity index 55%
rename from google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable.java
rename to google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java
index 6335b433ef..a434a97b3c 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java
@@ -28,37 +28,37 @@
import javax.annotation.Nonnull;
/**
- * This callable will inject a {@link GrpcResponseMetadata} to access the headers and trailers
- * returned by gRPC methods upon completion. The {@link BigtableTracer} will process metrics that
- * were injected in the header/trailer and publish them to OpenCensus. If {@link
- * GrpcResponseMetadata#getMetadata()} returned null, it probably means that the request has never
- * reached GFE, and it'll increment the gfe_header_missing_counter in this case.
+ * This callable will:
*
- * If GFE metrics are not registered in {@link RpcViews}, skip injecting GrpcResponseMetadata.
- * This is for the case where direct path is enabled, all the requests won't go through GFE and
- * therefore won't have the server-timing header.
+ *
- Inject a {@link GrpcResponseMetadata} to access the headers and trailers returned by gRPC
+ * methods upon completion. The {@link BigtableTracer} will process metrics that were injected in
+ * the header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
+ * returned null, it probably means that the request has never reached GFE, and it'll increment the
+ * gfe_header_missing_counter in this case.
+ *
+ *
- Get Bigtable zone and cluster information from response trailer and record in tracer.
*
*
This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
-public class HeaderTracerUnaryCallable
+public class BigtableTracerUnaryCallable
extends UnaryCallable {
private final UnaryCallable innerCallable;
- public HeaderTracerUnaryCallable(@Nonnull UnaryCallable innerCallable) {
+ public BigtableTracerUnaryCallable(@Nonnull UnaryCallable innerCallable) {
this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set");
}
@Override
public ApiFuture futureCall(RequestT request, ApiCallContext context) {
// tracer should always be an instance of BigtableTracer
- if (RpcViews.isGfeMetricsRegistered() && context.getTracer() instanceof BigtableTracer) {
+ if (context.getTracer() instanceof BigtableTracer) {
final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
final ApiCallContext contextWithResponseMetadata = responseMetadata.addHandlers(context);
- HeaderTracerUnaryCallback callback =
- new HeaderTracerUnaryCallback((BigtableTracer) context.getTracer(), responseMetadata);
+ BigtableTracerUnaryCallback callback =
+ new BigtableTracerUnaryCallback((BigtableTracer) context.getTracer(), responseMetadata);
ApiFuture future = innerCallable.futureCall(request, contextWithResponseMetadata);
ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor());
return future;
@@ -67,12 +67,12 @@ public ApiFuture futureCall(RequestT request, ApiCallContext context) {
}
}
- class HeaderTracerUnaryCallback implements ApiFutureCallback {
+ class BigtableTracerUnaryCallback implements ApiFutureCallback {
private final BigtableTracer tracer;
private final GrpcResponseMetadata responseMetadata;
- HeaderTracerUnaryCallback(BigtableTracer tracer, GrpcResponseMetadata responseMetadata) {
+ BigtableTracerUnaryCallback(BigtableTracer tracer, GrpcResponseMetadata responseMetadata) {
this.tracer = tracer;
this.responseMetadata = responseMetadata;
}
@@ -82,6 +82,16 @@ public void onFailure(Throwable throwable) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, throwable);
+ // try {
+ // byte[] trailers =
+ // responseMetadata
+ // .getTrailingMetadata()
+ // .get(Metadata.Key.of(Util.TRAILER_KEY,
+ // Metadata.BINARY_BYTE_MARSHALLER));
+ // ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
+ // tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
+ // } catch (NullPointerException | InvalidProtocolBufferException e) {
+ // }
}
@Override
@@ -89,6 +99,15 @@ public void onSuccess(ResponseT response) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
+ // try {
+ // byte[] trailers =
+ // responseMetadata
+ // .getTrailingMetadata()
+ // .get(Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER));
+ // ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
+ // tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
+ // } catch (NullPointerException | InvalidProtocolBufferException e) {
+ // }
}
}
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java
new file mode 100644
index 0000000000..d87d0ffc6a
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java
@@ -0,0 +1,244 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.api.gax.tracing.ApiTracerFactory;
+import com.google.api.gax.tracing.SpanName;
+import com.google.cloud.bigtable.stats.BuiltinMetricsRecorder;
+import com.google.cloud.bigtable.stats.StatsWrapper;
+import com.google.common.base.Stopwatch;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import org.threeten.bp.Duration;
+
+public class BuiltinMetricsTracer extends BigtableTracer {
+
+ private BuiltinMetricsRecorder recorder;
+
+ private final ApiTracerFactory.OperationType operationType;
+ private final SpanName spanName;
+
+ // Operation level metrics
+ private final AtomicBoolean opFinished = new AtomicBoolean();
+ private final Stopwatch operationTimer = Stopwatch.createStarted();
+ private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted();
+
+ // Attempt level metrics
+ private int attemptCount = 0;
+ private Stopwatch attemptTimer;
+ private volatile int attempt = 0;
+
+ // Total application latency
+ private final Stopwatch applicationLatencyTimer = Stopwatch.createUnstarted();
+ private final AtomicLong totalApplicationLatency = new AtomicLong(0);
+
+ // Monitored resource labels
+ private String tableId = "undefined";
+ private String zone = "undefined";
+ private String cluster = "undefined";
+
+ // gfe stats
+ private AtomicLong gfeMissingHeaders = new AtomicLong(0);
+
+ BuiltinMetricsTracer(
+ ApiTracerFactory.OperationType operationType,
+ SpanName spanName,
+ Map attributes,
+ StatsWrapper builtinMetricsWrapper,
+ BuiltinMetricsRecorder recorder) {
+ this.operationType = operationType;
+ this.spanName = spanName;
+ if (recorder != null) {
+ this.recorder = recorder;
+ } else {
+ this.recorder =
+ new BuiltinMetricsRecorder(operationType, spanName, attributes, builtinMetricsWrapper);
+ }
+ }
+
+ @Override
+ public Scope inScope() {
+ return new Scope() {
+ @Override
+ public void close() {}
+ };
+ }
+
+ @Override
+ public void operationSucceeded() {
+ recordOperationCompletion(null);
+ }
+
+ @Override
+ public void operationCancelled() {
+ recordOperationCompletion(new CancellationException());
+ }
+
+ @Override
+ public void operationFailed(Throwable error) {
+ recordOperationCompletion(error);
+ }
+
+ @Override
+ public void attemptStarted(int attemptNumber) {
+ attemptStarted(null, attemptNumber);
+ }
+
+ @Override
+ public void attemptStarted(Object request, int attemptNumber) {
+ this.attempt = attemptNumber;
+ attemptCount++;
+ attemptTimer = Stopwatch.createStarted();
+ if (request != null) {
+ this.tableId = Util.extractTableId(request);
+ }
+ if (applicationLatencyTimer.isRunning()) {
+ totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
+ applicationLatencyTimer.reset();
+ }
+ }
+
+ @Override
+ public void attemptSucceeded() {
+ recordAttemptCompletion(null);
+ }
+
+ @Override
+ public void attemptCancelled() {
+ recordAttemptCompletion(new CancellationException());
+ }
+
+ @Override
+ public void attemptFailed(Throwable error, Duration delay) {
+ if (!applicationLatencyTimer.isRunning()) {
+ applicationLatencyTimer.start();
+ }
+ recordAttemptCompletion(error);
+ }
+
+ @Override
+ public void attemptFailedRetriesExhausted(Throwable error) {
+ super.attemptFailedRetriesExhausted(error);
+ }
+
+ @Override
+ public void attemptPermanentFailure(Throwable error) {
+ super.attemptPermanentFailure(error);
+ }
+
+ @Override
+ public void lroStartFailed(Throwable error) {
+ super.lroStartFailed(error);
+ }
+
+ @Override
+ public void lroStartSucceeded() {
+ super.lroStartSucceeded();
+ }
+
+ @Override
+ public void onRequest() {
+ if (applicationLatencyTimer.isRunning()) {
+ totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
+ applicationLatencyTimer.reset();
+ }
+ }
+
+ @Override
+ public void responseReceived() {
+ if (!applicationLatencyTimer.isRunning()) {
+ applicationLatencyTimer.start();
+ }
+ if (firstResponsePerOpTimer.isRunning()) {
+ firstResponsePerOpTimer.stop();
+ }
+ }
+
+ @Override
+ public void requestSent() {
+ super.requestSent();
+ }
+
+ @Override
+ public void batchRequestSent(long elementCount, long requestSize) {
+ super.batchRequestSent(elementCount, requestSize);
+ }
+
+ @Override
+ public int getAttempt() {
+ return attempt;
+ }
+
+ @Override
+ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) {
+ // Record the metrics and put in the map after the attempt is done, so we can have cluster and
+ // zone information
+ if (latency != null) {
+ recorder.recordGfeLatencies(latency);
+ } else {
+ gfeMissingHeaders.incrementAndGet();
+ }
+ recorder.recordGfeMissingHeaders(gfeMissingHeaders.get());
+ }
+
+ @Override
+ public void setLocations(String zone, String cluster) {
+ this.zone = zone;
+ this.cluster = cluster;
+ }
+
+ @Override
+ public void batchRequestThrottled(long throttledTimeMs) {
+ recorder.recordBatchRequestThrottled(throttledTimeMs, tableId, zone, cluster);
+ }
+
+ private void recordOperationCompletion(@Nullable Throwable status) {
+ if (!opFinished.compareAndSet(false, true)) {
+ return;
+ }
+ operationTimer.stop();
+
+ recorder.recordRetryCount(attemptCount);
+
+ if (applicationLatencyTimer.isRunning()) {
+ applicationLatencyTimer.stop();
+ totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
+ }
+ recorder.recordApplicationLatency(totalApplicationLatency.get(), tableId, zone, cluster);
+
+ recorder.recordOperationLatencies(operationTimer.elapsed(TimeUnit.MILLISECONDS));
+
+ if (operationType == ApiTracerFactory.OperationType.ServerStreaming
+ && spanName.getMethodName().equals("ReadRows")) {
+ recorder.recordFirstResponseLatency(firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS));
+ }
+
+ recorder.recordOperationLevelWithoutStreaming(
+ Util.extractStatus(status), tableId, zone, cluster);
+ recorder.recordOperationLevelWithStreaming(Util.extractStatus(status), tableId, zone, cluster);
+ }
+
+ private void recordAttemptCompletion(@Nullable Throwable status) {
+ recorder.recordAttemptLatency(attemptTimer.elapsed(TimeUnit.MILLISECONDS));
+
+ recorder.recordAttemptLevelWithoutStreaming(Util.extractStatus(status), tableId, zone, cluster);
+ recorder.recordAttemptLevelWithStreaming(Util.extractStatus(status), tableId, zone, cluster);
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java
new file mode 100644
index 0000000000..44671f6cea
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.api.core.InternalApi;
+import com.google.api.gax.tracing.ApiTracer;
+import com.google.api.gax.tracing.BaseApiTracerFactory;
+import com.google.api.gax.tracing.SpanName;
+import com.google.cloud.bigtable.stats.BuiltinMetricsRecorder;
+import com.google.cloud.bigtable.stats.StatsWrapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+@InternalApi("For internal use only")
+public class BuiltinMetricsTracerFactory extends BaseApiTracerFactory {
+
+ private final ImmutableMap statsAttributes;
+ private final StatsWrapper statsWrapper;
+ private final BuiltinMetricsRecorder builtinMetricsRecorder;
+
+ public static BuiltinMetricsTracerFactory create(
+ StatsWrapper statsWrapper, ImmutableMap statsAttributes) {
+ return new BuiltinMetricsTracerFactory(statsWrapper, statsAttributes, null);
+ }
+
+ @VisibleForTesting
+ static BuiltinMetricsTracerFactory create(
+ StatsWrapper statsWrapper,
+ ImmutableMap statsAttributes,
+ BuiltinMetricsRecorder recorder) {
+ return new BuiltinMetricsTracerFactory(statsWrapper, statsAttributes, recorder);
+ }
+
+ private BuiltinMetricsTracerFactory(
+ StatsWrapper statsWrapper,
+ ImmutableMap statsAttributes,
+ BuiltinMetricsRecorder recorder) {
+ this.statsAttributes = statsAttributes;
+ this.statsWrapper = statsWrapper;
+ this.builtinMetricsRecorder = recorder;
+ }
+
+ @Override
+ public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) {
+ return new BuiltinMetricsTracer(
+ operationType, spanName, statsAttributes, statsWrapper, builtinMetricsRecorder);
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java
index 5f4580743b..29c16baafb 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java
@@ -92,9 +92,14 @@ public void connectionSelected(String id) {
@Override
public void attemptStarted(int attemptNumber) {
+ attemptStarted(null, attemptNumber);
+ }
+
+ @Override
+ public void attemptStarted(Object request, int attemptNumber) {
this.attempt = attemptNumber;
for (ApiTracer child : children) {
- child.attemptStarted(attemptNumber);
+ child.attemptStarted(request, attemptNumber);
}
}
@@ -185,4 +190,18 @@ public void batchRequestThrottled(long throttledTimeMs) {
tracer.batchRequestThrottled(throttledTimeMs);
}
}
+
+ @Override
+ public void setLocations(String zone, String cluster) {
+ for (BigtableTracer tracer : bigtableTracers) {
+ tracer.setLocations(zone, cluster);
+ }
+ }
+
+ @Override
+ public void onRequest() {
+ for (BigtableTracer tracer : bigtableTracers) {
+ tracer.onRequest();
+ }
+ }
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java
index f28b07c0cb..d27941bb71 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java
@@ -118,7 +118,9 @@ private void recordOperationCompletion(@Nullable Throwable throwable) {
TagContextBuilder tagCtx =
newTagCtxBuilder()
- .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable));
+ .putLocal(
+ RpcMeasureConstants.BIGTABLE_STATUS,
+ TagValue.create(Util.extractStatus(throwable)));
measures.record(tagCtx.build());
}
@@ -171,7 +173,9 @@ private void recordAttemptCompletion(@Nullable Throwable throwable) {
TagContextBuilder tagCtx =
newTagCtxBuilder()
- .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable));
+ .putLocal(
+ RpcMeasureConstants.BIGTABLE_STATUS,
+ TagValue.create(Util.extractStatus(throwable)));
measures.record(tagCtx.build());
}
@@ -222,7 +226,8 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
}
measures.record(
newTagCtxBuilder()
- .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable))
+ .putLocal(
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create(Util.extractStatus(throwable)))
.build());
}
@@ -248,4 +253,14 @@ private TagContextBuilder newTagCtxBuilder() {
return tagCtx;
}
+
+ @Override
+ public void setLocations(String zone, String cluster) {
+ // noop
+ }
+
+ @Override
+ public void onRequest() {
+ // noop
+ }
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java
index 00995b717a..bfd64ee958 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java
@@ -15,10 +15,19 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;
+import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StatusCode.Code;
+import com.google.bigtable.v2.CheckAndMutateRowRequest;
+import com.google.bigtable.v2.MutateRowRequest;
+import com.google.bigtable.v2.MutateRowsRequest;
+import com.google.bigtable.v2.ReadModifyWriteRowRequest;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.SampleRowKeysRequest;
+import com.google.bigtable.v2.TableName;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.grpc.Metadata;
import io.grpc.Status;
@@ -38,7 +47,8 @@
import javax.annotation.Nullable;
/** Utilities to help integrating with OpenCensus. */
-class Util {
+@InternalApi("For internal use only")
+public class Util {
static final Metadata.Key ATTEMPT_HEADER_KEY =
Metadata.Key.of("bigtable-attempt", Metadata.ASCII_STRING_MARSHALLER);
static final Metadata.Key ATTEMPT_EPOCH_KEY =
@@ -48,14 +58,14 @@ class Util {
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);
private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?\\d+)");
- private static final TagValue OK_STATUS = TagValue.create(StatusCode.Code.OK.toString());
+ static final String TRAILER_KEY = "x-goog-ext-425905942-bin";
- /** Convert an exception into a value that can be used as an OpenCensus tag value. */
- static TagValue extractStatus(@Nullable Throwable error) {
+ /** Convert an exception into a value that can be used to create an OpenCensus tag value. */
+ static String extractStatus(@Nullable Throwable error) {
final String statusString;
if (error == null) {
- return OK_STATUS;
+ return StatusCode.Code.OK.toString();
} else if (error instanceof CancellationException) {
statusString = Status.Code.CANCELLED.toString();
} else if (error instanceof ApiException) {
@@ -68,14 +78,14 @@ static TagValue extractStatus(@Nullable Throwable error) {
statusString = Code.UNKNOWN.toString();
}
- return TagValue.create(statusString);
+ return statusString;
}
/**
* Await the result of the future and convert it into a value that can be used as an OpenCensus
* tag value.
*/
- static TagValue extractStatus(Future> future) {
+ static TagValue extractStatusAsync(Future> future) {
Throwable error = null;
try {
@@ -88,7 +98,25 @@ static TagValue extractStatus(Future> future) {
} catch (RuntimeException e) {
error = e;
}
- return extractStatus(error);
+ return TagValue.create(extractStatus(error));
+ }
+
+ static String extractTableId(Object request) {
+ String tableName = null;
+ if (request instanceof ReadRowsRequest) {
+ tableName = ((ReadRowsRequest) request).getTableName();
+ } else if (request instanceof MutateRowsRequest) {
+ tableName = ((MutateRowsRequest) request).getTableName();
+ } else if (request instanceof MutateRowRequest) {
+ tableName = ((MutateRowRequest) request).getTableName();
+ } else if (request instanceof SampleRowKeysRequest) {
+ tableName = ((SampleRowKeysRequest) request).getTableName();
+ } else if (request instanceof CheckAndMutateRowRequest) {
+ tableName = ((CheckAndMutateRowRequest) request).getTableName();
+ } else if (request instanceof ReadModifyWriteRowRequest) {
+ tableName = ((ReadModifyWriteRowRequest) request).getTableName();
+ }
+ return !Strings.isNullOrEmpty(tableName) ? TableName.parse(tableName).getTable() : "undefined";
}
/**
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java
similarity index 95%
rename from google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerCallableTest.java
rename to google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java
index 03aad7f822..eba8411189 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerCallableTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java
@@ -55,7 +55,6 @@
import io.grpc.stub.StreamObserver;
import io.opencensus.impl.stats.StatsComponentImpl;
import io.opencensus.stats.StatsComponent;
-import io.opencensus.stats.ViewData;
import io.opencensus.tags.TagKey;
import io.opencensus.tags.TagValue;
import io.opencensus.tags.Tags;
@@ -68,7 +67,7 @@
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
-public class HeaderTracerCallableTest {
+public class BigtableTracerCallableTest {
private FakeServiceHelper serviceHelper;
private FakeServiceHelper serviceHelperNoHeader;
@@ -383,24 +382,6 @@ public void testMetricsWithErrorResponse() throws InterruptedException {
assertThat(missingCount).isEqualTo(attempts);
}
- @Test
- public void testCallableBypassed() throws InterruptedException {
- RpcViews.setGfeMetricsRegistered(false);
- stub.readRowsCallable().call(Query.create(TABLE_ID));
- Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
- ViewData headerMissingView =
- localStats
- .getViewManager()
- .getView(RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW.getName());
- ViewData latencyView =
- localStats.getViewManager().getView(RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW.getName());
- // Verify that the view is registered by it's not collecting metrics
- assertThat(headerMissingView).isNotNull();
- assertThat(latencyView).isNotNull();
- assertThat(headerMissingView.getAggregationMap()).isEmpty();
- assertThat(latencyView.getAggregationMap()).isEmpty();
- }
-
private class FakeService extends BigtableImplBase {
private final String defaultTableName =
NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID);
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java
new file mode 100644
index 0000000000..06e756ba5a
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java
@@ -0,0 +1,389 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.api.client.util.Lists;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.rpc.ClientContext;
+import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.StreamController;
+import com.google.bigtable.v2.BigtableGrpc;
+import com.google.bigtable.v2.MutateRowRequest;
+import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.ReadRowsResponse;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.FakeServiceHelper;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.cloud.bigtable.data.v2.models.RowMutation;
+import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
+import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
+import com.google.cloud.bigtable.stats.BuiltinMetricsRecorder;
+import com.google.cloud.bigtable.stats.StatsWrapper;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.StringValue;
+import io.grpc.ForwardingServerCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.threeten.bp.Duration;
+
+public class BuiltinMetricsTracerTest {
+ private static final String PROJECT_ID = "fake-project";
+ private static final String INSTANCE_ID = "fake-instance";
+ private static final String APP_PROFILE_ID = "default";
+ private static final String TABLE_ID = "fake-table";
+ private static final String ZONE_0 = "us-west-1";
+ private static final String CLUSTER_0 = "cluster-0";
+ private static final String ZONE_1 = "us-east-1";
+ private static final String CLUSTER_1 = "cluster-1";
+ private static final String UNDEFINED = "undefined";
+ private static final long FAKE_SERVER_TIMING = 50;
+ private static final long SERVER_LATENCY = 500;
+
+ private final AtomicInteger rpcCount = new AtomicInteger(0);
+
+ private static final ReadRowsResponse READ_ROWS_RESPONSE_1 =
+ ReadRowsResponse.newBuilder()
+ .addChunks(
+ ReadRowsResponse.CellChunk.newBuilder()
+ .setRowKey(ByteString.copyFromUtf8("fake-key-1"))
+ .setFamilyName(StringValue.of("cf"))
+ .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")))
+ .setTimestampMicros(1_000)
+ .setValue(ByteString.copyFromUtf8("value"))
+ .setCommitRow(true))
+ .build();
+ private static final ReadRowsResponse READ_ROWS_RESPONSE_2 =
+ ReadRowsResponse.newBuilder()
+ .addChunks(
+ ReadRowsResponse.CellChunk.newBuilder()
+ .setRowKey(ByteString.copyFromUtf8("fake-key-2"))
+ .setFamilyName(StringValue.of("cf"))
+ .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")))
+ .setTimestampMicros(1_000)
+ .setValue(ByteString.copyFromUtf8("value"))
+ .setCommitRow(true))
+ .build();
+ private static final ReadRowsResponse READ_ROWS_RESPONSE_3 =
+ ReadRowsResponse.newBuilder()
+ .addChunks(
+ ReadRowsResponse.CellChunk.newBuilder()
+ .setRowKey(ByteString.copyFromUtf8("fake-key-3"))
+ .setFamilyName(StringValue.of("cf"))
+ .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")))
+ .setTimestampMicros(1_000)
+ .setValue(ByteString.copyFromUtf8("value"))
+ .setCommitRow(true))
+ .build();
+
+ @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ private FakeServiceHelper serviceHelper;
+
+ private BigtableGrpc.BigtableImplBase mockService;
+
+ private EnhancedBigtableStub stub;
+
+ private BuiltinMetricsRecorder builtinMetricsRecorder;
+ private ArgumentCaptor longValue;
+ private ArgumentCaptor intValue;
+ private ArgumentCaptor status;
+ private ArgumentCaptor tableId;
+ private ArgumentCaptor zone;
+ private ArgumentCaptor cluster;
+
+ private Stopwatch serverRetryDelayStopwatch;
+ private AtomicLong serverTotalRetryDelay;
+
+ @Before
+ public void setUp() throws Exception {
+ mockService = new FakeService();
+
+ serverRetryDelayStopwatch = Stopwatch.createUnstarted();
+ serverTotalRetryDelay = new AtomicLong(0);
+
+ // Add an interceptor to send location information in the trailers and add server-timing in
+ // headers
+ ServerInterceptor trailersInterceptor =
+ new ServerInterceptor() {
+ private AtomicInteger count = new AtomicInteger(0);
+
+ @Override
+ public ServerCall.Listener interceptCall(
+ ServerCall serverCall,
+ Metadata metadata,
+ ServerCallHandler serverCallHandler) {
+ return serverCallHandler.startCall(
+ new ForwardingServerCall.SimpleForwardingServerCall(serverCall) {
+ @Override
+ public void sendHeaders(Metadata headers) {
+ headers.put(
+ Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER),
+ String.format("gfet4t7; dur=%d", FAKE_SERVER_TIMING));
+ super.sendHeaders(headers);
+ }
+
+ @Override
+ public void close(Status status, Metadata trailers) {
+ // int currentCount = count.getAndIncrement();
+ // if (currentCount == 0) {
+ // ResponseParams params =
+ // ResponseParams.newBuilder()
+ // .setZoneId(ZONE_0)
+ // .setClusterId(CLUSTER_0)
+ // .build();
+ // byte[] byteArray = params.toByteArray();
+ // trailers.put(
+ // Metadata.Key.of(Util.TRAILER_KEY,
+ // Metadata.BINARY_BYTE_MARSHALLER),
+ // byteArray);
+ // } else {
+ // ResponseParams params =
+ // ResponseParams.newBuilder()
+ // .setClusterId(CLUSTER_1)
+ // .setZoneId(ZONE_1)
+ // .build();
+ // byte[] byteArray = params.toByteArray();
+ // trailers.put(
+ // Metadata.Key.of(Util.TRAILER_KEY,
+ // Metadata.BINARY_BYTE_MARSHALLER),
+ // byteArray);
+ // }
+ super.close(status, trailers);
+ }
+ },
+ metadata);
+ }
+ };
+
+ serviceHelper = new FakeServiceHelper(trailersInterceptor, mockService);
+
+ builtinMetricsRecorder = Mockito.mock(BuiltinMetricsRecorder.class);
+
+ BigtableDataSettings settings =
+ BigtableDataSettings.newBuilderForEmulator(serviceHelper.getPort())
+ .setProjectId(PROJECT_ID)
+ .setInstanceId(INSTANCE_ID)
+ .setAppProfileId(APP_PROFILE_ID)
+ .build();
+ EnhancedBigtableStubSettings.Builder stubSettingsBuilder =
+ settings.getStubSettings().toBuilder();
+ stubSettingsBuilder
+ .mutateRowSettings()
+ .retrySettings()
+ .setInitialRetryDelay(Duration.ofMillis(200));
+ stubSettingsBuilder.setTracerFactory(
+ BuiltinMetricsTracerFactory.create(
+ new StatsWrapper(false), ImmutableMap.of(), builtinMetricsRecorder));
+
+ EnhancedBigtableStubSettings stubSettings = stubSettingsBuilder.build();
+ stub = new EnhancedBigtableStub(stubSettings, ClientContext.create(stubSettings));
+
+ serviceHelper.start();
+
+ longValue = ArgumentCaptor.forClass(long.class);
+ intValue = ArgumentCaptor.forClass(int.class);
+ status = ArgumentCaptor.forClass(String.class);
+ tableId = ArgumentCaptor.forClass(String.class);
+ zone = ArgumentCaptor.forClass(String.class);
+ cluster = ArgumentCaptor.forClass(String.class);
+ }
+
+ @After
+ public void tearDown() {
+ stub.close();
+ serviceHelper.shutdown();
+ }
+
+ @Test
+ public void testOperationLatencies() {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)).iterator());
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ verify(builtinMetricsRecorder).recordOperationLatencies(longValue.capture());
+
+ assertThat(longValue.getValue()).isIn(Range.closed(SERVER_LATENCY, elapsed));
+ }
+
+ @Test
+ public void testGfeMetrics() {
+ Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)));
+
+ verify(builtinMetricsRecorder).recordGfeLatencies(longValue.capture());
+ assertThat(longValue.getValue()).isEqualTo(FAKE_SERVER_TIMING);
+
+ verify(builtinMetricsRecorder).recordGfeMissingHeaders(longValue.capture());
+ assertThat(longValue.getValue()).isEqualTo(0);
+ }
+
+ @Test
+ public void testReadRowsApplicationLatency() throws Exception {
+ final long applicationLatency = 1000;
+ final SettableApiFuture future = SettableApiFuture.create();
+ final AtomicInteger counter = new AtomicInteger(0);
+ // We want to measure how long application waited before requesting another message after
+ // the previous message is returned from the server. Using ResponseObserver here so that the
+ // flow will be
+ // onResponse() -> sleep -> onRequest() (for the next message) which is exactly what we want to
+ // measure for
+ // application latency.
+ // If we do readRowsCallable().call(Query.create(TABLE_ID)).iterator() and iterate through the
+ // iterator and sleep in
+ // between responses, when the call started, the client will pre-fetch the first response, which
+ // won't be counted
+ // in application latency. So the test will be flaky and hard to debug.
+ stub.readRowsCallable()
+ .call(
+ Query.create(TABLE_ID),
+ new ResponseObserver() {
+ @Override
+ public void onStart(StreamController streamController) {}
+
+ @Override
+ public void onResponse(Row row) {
+ try {
+ counter.incrementAndGet();
+ Thread.sleep(applicationLatency);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ future.setException(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ future.set(null);
+ }
+ });
+ future.get();
+
+ verify(builtinMetricsRecorder)
+ .recordApplicationLatency(
+ longValue.capture(), tableId.capture(), zone.capture(), cluster.capture());
+
+ assertThat(counter.get()).isGreaterThan(0);
+ assertThat(longValue.getValue()).isAtLeast(applicationLatency * counter.get());
+ assertThat(longValue.getValue()).isLessThan(applicationLatency * (counter.get() + 1));
+ }
+
+ @Test
+ public void testMutateRowApplicationLatency() {
+ // Unary callable application latency is the delay between retries
+ stub.mutateRowCallable()
+ .call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value"));
+
+ verify(builtinMetricsRecorder)
+ .recordApplicationLatency(
+ longValue.capture(), tableId.capture(), zone.capture(), cluster.capture());
+
+ // Application latency should be slightly less than the total delay between 2 requests observed
+ // from the server side. To make
+ // the test less flaky comparing with half of the server side delay here.
+ assertThat(longValue.getValue()).isAtLeast(serverTotalRetryDelay.get() / 2);
+ assertThat(longValue.getValue()).isAtMost(serverTotalRetryDelay.get());
+ }
+
+ @Test
+ public void testRetryCount() {
+ stub.mutateRowCallable()
+ .call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value"));
+
+ verify(builtinMetricsRecorder).recordRetryCount(intValue.capture());
+
+ assertThat(intValue.getValue()).isEqualTo(3);
+ }
+
+ @Test
+ public void testMutateRowAttempts() {
+ stub.mutateRowCallable()
+ .call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value"));
+
+ verify(builtinMetricsRecorder, times(3))
+ .recordAttemptLevelWithStreaming(
+ status.capture(), tableId.capture(), zone.capture(), cluster.capture());
+ assertThat(zone.getAllValues().get(0)).isEqualTo(UNDEFINED);
+ assertThat(zone.getAllValues().get(1)).isEqualTo(UNDEFINED);
+ assertThat(zone.getAllValues().get(2)).isEqualTo(UNDEFINED);
+ assertThat(cluster.getAllValues().get(0)).isEqualTo(UNDEFINED);
+ assertThat(cluster.getAllValues().get(1)).isEqualTo(UNDEFINED);
+ assertThat(cluster.getAllValues().get(2)).isEqualTo(UNDEFINED);
+ assertThat(status.getAllValues().get(0)).isEqualTo("UNAVAILABLE");
+ assertThat(status.getAllValues().get(1)).isEqualTo("UNAVAILABLE");
+ assertThat(status.getAllValues().get(2)).isEqualTo("OK");
+ }
+
+ private class FakeService extends BigtableGrpc.BigtableImplBase {
+
+ @Override
+ public void readRows(
+ ReadRowsRequest request, StreamObserver responseObserver) {
+ try {
+ Thread.sleep(SERVER_LATENCY);
+ } catch (InterruptedException e) {
+ }
+ responseObserver.onNext(READ_ROWS_RESPONSE_1);
+ responseObserver.onNext(READ_ROWS_RESPONSE_2);
+ responseObserver.onNext(READ_ROWS_RESPONSE_3);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void mutateRow(
+ MutateRowRequest request, StreamObserver responseObserver) {
+ if (serverRetryDelayStopwatch.isRunning()) {
+ serverTotalRetryDelay.addAndGet(serverRetryDelayStopwatch.elapsed(TimeUnit.MILLISECONDS));
+ serverRetryDelayStopwatch.reset();
+ }
+ if (rpcCount.get() < 2) {
+ responseObserver.onError(new StatusRuntimeException(Status.UNAVAILABLE));
+ rpcCount.getAndIncrement();
+ serverRetryDelayStopwatch.start();
+ return;
+ }
+ responseObserver.onNext(MutateRowResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ }
+ }
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java
index 69a741d0e3..0de14636c6 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java
@@ -23,6 +23,7 @@
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracer.Scope;
+import com.google.bigtable.v2.ReadRowsRequest;
import com.google.cloud.bigtable.misc_utilities.MethodComparator;
import com.google.common.collect.ImmutableList;
import io.grpc.Status;
@@ -118,11 +119,12 @@ public void testConnectionSelected() {
@Test
public void testAttemptStarted() {
- compositeTracer.attemptStarted(3);
- verify(child1, times(1)).attemptStarted(3);
- verify(child2, times(1)).attemptStarted(3);
- verify(child3, times(1)).attemptStarted(3);
- verify(child4, times(1)).attemptStarted(3);
+ ReadRowsRequest request = ReadRowsRequest.getDefaultInstance();
+ compositeTracer.attemptStarted(request, 3);
+ verify(child1, times(1)).attemptStarted(request, 3);
+ verify(child2, times(1)).attemptStarted(request, 3);
+ verify(child3, times(1)).attemptStarted(request, 3);
+ verify(child4, times(1)).attemptStarted(request, 3);
}
@Test
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java
index efef3b67d2..134a7604de 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java
@@ -30,13 +30,13 @@
public class UtilTest {
@Test
public void testOk() {
- TagValue tagValue = Util.extractStatus((Throwable) null);
+ TagValue tagValue = TagValue.create(Util.extractStatus((Throwable) null));
assertThat(tagValue.asString()).isEqualTo("OK");
}
@Test
public void testOkFuture() {
- TagValue tagValue = Util.extractStatus(Futures.immediateFuture(null));
+ TagValue tagValue = Util.extractStatusAsync(Futures.immediateFuture(null));
assertThat(tagValue.asString()).isEqualTo("OK");
}
@@ -45,7 +45,7 @@ public void testError() {
DeadlineExceededException error =
new DeadlineExceededException(
"Deadline exceeded", null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true);
- TagValue tagValue = Util.extractStatus(error);
+ TagValue tagValue = TagValue.create(Util.extractStatus(error));
assertThat(tagValue.asString()).isEqualTo("DEADLINE_EXCEEDED");
}
@@ -54,13 +54,13 @@ public void testErrorFuture() {
DeadlineExceededException error =
new DeadlineExceededException(
"Deadline exceeded", null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true);
- TagValue tagValue = Util.extractStatus(Futures.immediateFailedFuture(error));
+ TagValue tagValue = Util.extractStatusAsync(Futures.immediateFailedFuture(error));
assertThat(tagValue.asString()).isEqualTo("DEADLINE_EXCEEDED");
}
@Test
public void testCancelledFuture() {
- TagValue tagValue = Util.extractStatus(Futures.immediateCancelledFuture());
+ TagValue tagValue = Util.extractStatusAsync(Futures.immediateCancelledFuture());
assertThat(tagValue.asString()).isEqualTo("CANCELLED");
}
}
diff --git a/pom.xml b/pom.xml
index dbfddb8ee7..57fffc3e4b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -212,6 +212,15 @@
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.4
+
+
+