diff --git a/google-cloud-bigtable-bom/pom.xml b/google-cloud-bigtable-bom/pom.xml
index c43f462047..5d1d219f4e 100644
--- a/google-cloud-bigtable-bom/pom.xml
+++ b/google-cloud-bigtable-bom/pom.xml
@@ -94,6 +94,11 @@
proto-google-cloud-bigtable-v2
2.8.1-SNAPSHOT
+
+ com.google.cloud
+ google-cloud-bigtable-stats
+ 2.8.1-SNAPSHOT
+
diff --git a/google-cloud-bigtable-stats/pom.xml b/google-cloud-bigtable-stats/pom.xml
index cb88f01c19..4b0119eaf4 100644
--- a/google-cloud-bigtable-stats/pom.xml
+++ b/google-cloud-bigtable-stats/pom.xml
@@ -79,6 +79,7 @@
org.apache.maven.plugins
maven-shade-plugin
+ 3.2.4
package
diff --git a/google-cloud-bigtable/pom.xml b/google-cloud-bigtable/pom.xml
index 0a6c973c59..1c01dfc1ee 100644
--- a/google-cloud-bigtable/pom.xml
+++ b/google-cloud-bigtable/pom.xml
@@ -61,6 +61,10 @@
+
+ com.google.cloud
+ google-cloud-bigtable-stats
+
com.google.api
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
index d8daaa80e6..4087dc59b4 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
@@ -70,6 +70,7 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
+import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
@@ -89,6 +90,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
+import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -152,6 +154,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
EnhancedBigtableStubSettings settings, Tagger tagger, StatsRecorder stats)
throws IOException {
EnhancedBigtableStubSettings.Builder builder = settings.toBuilder();
+ StatsWrapper statsWrapper = StatsWrapper.get();
// TODO: this implementation is on the cusp of unwieldy, if we end up adding more features
// consider splitting it up by feature.
@@ -194,6 +197,12 @@ public static EnhancedBigtableStubSettings finalizeSettings(
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
TagValue.create(settings.getAppProfileId()))
.build();
+ ImmutableMap builtinAttributes =
+ ImmutableMap.builder()
+ .put("project_id", settings.getProjectId())
+ .put("instance_id", settings.getInstanceId())
+ .put("app_profile", settings.getAppProfileId())
+ .build();
// Inject Opencensus instrumentation
builder.setTracerFactory(
new CompositeTracerFactory(
@@ -218,6 +227,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build()),
// Add OpenCensus Metrics
MetricsTracerFactory.create(tagger, stats, attributes),
+ BuiltinMetricsTracerFactory.create(statsWrapper, builtinAttributes),
// Add user configured tracer
settings.getTracerFactory())));
return builder.build();
@@ -466,7 +476,7 @@ private UnaryCallable> createBulkReadRowsCallable(
new TracedBatcherUnaryCallable<>(readRowsUserCallable.all());
UnaryCallable> withHeaderTracer =
- new HeaderTracerUnaryCallable(tracedBatcher);
+ new HeaderTracerUnaryCallable<>(tracedBatcher);
UnaryCallable> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), span);
@@ -594,11 +604,11 @@ private UnaryCallable createBulkMutateRowsCallable() {
SpanName spanName = getSpanName("MutateRows");
- UnaryCallable tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing);
+ UnaryCallable tracedBatcherUnaryCallable =
+ new TracedBatcherUnaryCallable<>(userFacing);
UnaryCallable withHeaderTracer =
- new HeaderTracerUnaryCallable<>(tracedBatcher);
-
+ new HeaderTracerUnaryCallable<>(tracedBatcherUnaryCallable);
UnaryCallable traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), spanName);
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
index 3d7707cc4c..421c5b4724 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
@@ -25,7 +25,7 @@
* A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base
* implementation that does nothing.
*/
-@BetaApi("This surface is stable yet it might be removed in the future.")
+@BetaApi("This surface is not stable yet it might be removed in the future.")
public class BigtableTracer extends BaseApiTracer {
private volatile int attempt = 0;
@@ -35,6 +35,11 @@ public void attemptStarted(int attemptNumber) {
this.attempt = attemptNumber;
}
+ /** annotate when onRequest is called */
+ public void onRequest() {
+ // noop
+ }
+
/**
* Get the attempt number of the current call. Attempt number for the current call is passed in
* and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from
@@ -57,4 +62,9 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
public void batchRequestThrottled(long throttledTimeMs) {
// noop
}
+
+ /** Set the Bigtable zone and cluster so metrics can be tagged with location information. */
+ public void setLocations(String zone, String cluster) {
+ // noop
+ }
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java
new file mode 100644
index 0000000000..d369cbfcb6
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java
@@ -0,0 +1,244 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.api.gax.tracing.ApiTracerFactory;
+import com.google.api.gax.tracing.SpanName;
+import com.google.cloud.bigtable.stats.StatsRecorderWrapper;
+import com.google.cloud.bigtable.stats.StatsWrapper;
+import com.google.common.base.Stopwatch;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import org.threeten.bp.Duration;
+
+/**
+ * A {@link BigtableTracer} that records built-in metrics and publish under the
+ * bigtable.googleapis.com/client namespace
+ */
+class BuiltinMetricsTracer extends BigtableTracer {
+
+ private final StatsRecorderWrapper recorder;
+
+ private final ApiTracerFactory.OperationType operationType;
+ private final SpanName spanName;
+
+ // Operation level metrics
+ private final AtomicBoolean opFinished = new AtomicBoolean();
+ private final Stopwatch operationTimer = Stopwatch.createStarted();
+ private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted();
+
+ // Attempt level metrics
+ private int attemptCount = 0;
+ private Stopwatch attemptTimer;
+ private volatile int attempt = 0;
+
+ // Total application latency
+ private final Stopwatch applicationLatencyTimer = Stopwatch.createUnstarted();
+ private final AtomicLong totalApplicationLatency = new AtomicLong(0);
+
+ // Monitored resource labels
+ private String tableId = "undefined";
+ private String zone = "undefined";
+ private String cluster = "undefined";
+
+ // gfe stats
+ private AtomicLong gfeMissingHeaders = new AtomicLong(0);
+
+ BuiltinMetricsTracer(
+ ApiTracerFactory.OperationType operationType,
+ SpanName spanName,
+ Map attributes,
+ StatsWrapper statsWrapper,
+ StatsRecorderWrapper recorder) {
+ this.operationType = operationType;
+ this.spanName = spanName;
+ if (recorder != null) {
+ this.recorder = recorder;
+ } else {
+ this.recorder = new StatsRecorderWrapper(operationType, spanName, attributes, statsWrapper);
+ }
+ }
+
+ @Override
+ public Scope inScope() {
+ return new Scope() {
+ @Override
+ public void close() {}
+ };
+ }
+
+ @Override
+ public void operationSucceeded() {
+ recordOperationCompletion(null);
+ }
+
+ @Override
+ public void operationCancelled() {
+ recordOperationCompletion(new CancellationException());
+ }
+
+ @Override
+ public void operationFailed(Throwable error) {
+ recordOperationCompletion(error);
+ }
+
+ @Override
+ public void attemptStarted(int attemptNumber) {
+ attemptStarted(null, attemptNumber);
+ }
+
+ @Override
+ public void attemptStarted(Object request, int attemptNumber) {
+ this.attempt = attemptNumber;
+ attemptCount++;
+ attemptTimer = Stopwatch.createStarted();
+ if (request != null) {
+ this.tableId = Util.extractTableId(request);
+ }
+ if (applicationLatencyTimer.isRunning()) {
+ totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
+ applicationLatencyTimer.reset();
+ }
+ }
+
+ @Override
+ public void attemptSucceeded() {
+ recordAttemptCompletion(null);
+ }
+
+ @Override
+ public void attemptCancelled() {
+ recordAttemptCompletion(new CancellationException());
+ }
+
+ @Override
+ public void attemptFailed(Throwable error, Duration delay) {
+ if (!applicationLatencyTimer.isRunning()) {
+ applicationLatencyTimer.start();
+ }
+ recordAttemptCompletion(error);
+ }
+
+ @Override
+ public void attemptFailedRetriesExhausted(Throwable error) {
+ super.attemptFailedRetriesExhausted(error);
+ }
+
+ @Override
+ public void attemptPermanentFailure(Throwable error) {
+ super.attemptPermanentFailure(error);
+ }
+
+ @Override
+ public void lroStartFailed(Throwable error) {
+ super.lroStartFailed(error);
+ }
+
+ @Override
+ public void lroStartSucceeded() {
+ super.lroStartSucceeded();
+ }
+
+ @Override
+ public void onRequest() {
+ if (applicationLatencyTimer.isRunning()) {
+ totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
+ applicationLatencyTimer.reset();
+ }
+ }
+
+ @Override
+ public void responseReceived() {
+ if (!applicationLatencyTimer.isRunning()) {
+ applicationLatencyTimer.start();
+ }
+ if (firstResponsePerOpTimer.isRunning()) {
+ firstResponsePerOpTimer.stop();
+ }
+ }
+
+ @Override
+ public void requestSent() {
+ super.requestSent();
+ }
+
+ @Override
+ public void batchRequestSent(long elementCount, long requestSize) {
+ super.batchRequestSent(elementCount, requestSize);
+ }
+
+ @Override
+ public int getAttempt() {
+ return attempt;
+ }
+
+ @Override
+ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) {
+ // Record the metrics and put in the map after the attempt is done, so we can have cluster and
+ // zone information
+ if (latency != null) {
+ recorder.putGfeLatencies(latency);
+ } else {
+ gfeMissingHeaders.incrementAndGet();
+ }
+ recorder.putGfeMissingHeaders(gfeMissingHeaders.get());
+ }
+
+ @Override
+ public void setLocations(String zone, String cluster) {
+ this.zone = zone;
+ this.cluster = cluster;
+ }
+
+ @Override
+ public void batchRequestThrottled(long throttledTimeMs) {
+ recorder.putBatchRequestThrottled(throttledTimeMs);
+ }
+
+ private void recordOperationCompletion(@Nullable Throwable status) {
+ if (!opFinished.compareAndSet(false, true)) {
+ return;
+ }
+ operationTimer.stop();
+
+ recorder.putRetryCount(attemptCount);
+
+ if (applicationLatencyTimer.isRunning()) {
+ applicationLatencyTimer.stop();
+ totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
+ }
+ recorder.putApplicationLatencies(totalApplicationLatency.get());
+
+ recorder.putOperationLatencies(operationTimer.elapsed(TimeUnit.MILLISECONDS));
+
+ if (operationType == ApiTracerFactory.OperationType.ServerStreaming
+ && spanName.getMethodName().equals("ReadRows")) {
+ recorder.putFirstResponseLatencies(firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS));
+ }
+
+ recorder.record(Util.extractStatus(status), tableId, zone, cluster);
+ }
+
+ private void recordAttemptCompletion(@Nullable Throwable status) {
+ recorder.putAttemptLatencies(attemptTimer.elapsed(TimeUnit.MILLISECONDS));
+
+ recorder.record(Util.extractStatus(status), tableId, zone, cluster);
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java
new file mode 100644
index 0000000000..3583733a45
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.api.core.InternalApi;
+import com.google.api.gax.tracing.ApiTracer;
+import com.google.api.gax.tracing.ApiTracerFactory;
+import com.google.api.gax.tracing.BaseApiTracerFactory;
+import com.google.api.gax.tracing.SpanName;
+import com.google.cloud.bigtable.stats.StatsRecorderWrapper;
+import com.google.cloud.bigtable.stats.StatsWrapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * {@link ApiTracerFactory} that will generate OpenCensus metrics by using the {@link ApiTracer}
+ * api.
+ */
+@InternalApi("For internal use only")
+public class BuiltinMetricsTracerFactory extends BaseApiTracerFactory {
+
+ private final ImmutableMap statsAttributes;
+ private final StatsWrapper statsWrapper;
+ private final StatsRecorderWrapper statsRecorderWrapper;
+
+ public static BuiltinMetricsTracerFactory create(
+ StatsWrapper statsWrapper, ImmutableMap statsAttributes) {
+ return new BuiltinMetricsTracerFactory(statsWrapper, statsAttributes, null);
+ }
+
+ @VisibleForTesting
+ static BuiltinMetricsTracerFactory create(
+ StatsWrapper statsWrapper,
+ ImmutableMap statsAttributes,
+ StatsRecorderWrapper recorder) {
+ return new BuiltinMetricsTracerFactory(statsWrapper, statsAttributes, recorder);
+ }
+
+ private BuiltinMetricsTracerFactory(
+ StatsWrapper statsWrapper,
+ ImmutableMap statsAttributes,
+ StatsRecorderWrapper recorder) {
+ this.statsAttributes = statsAttributes;
+ this.statsWrapper = statsWrapper;
+ this.statsRecorderWrapper = recorder;
+ }
+
+ @Override
+ public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) {
+ return new BuiltinMetricsTracer(
+ operationType, spanName, statsAttributes, statsWrapper, statsRecorderWrapper);
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java
index 5f4580743b..29c16baafb 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java
@@ -92,9 +92,14 @@ public void connectionSelected(String id) {
@Override
public void attemptStarted(int attemptNumber) {
+ attemptStarted(null, attemptNumber);
+ }
+
+ @Override
+ public void attemptStarted(Object request, int attemptNumber) {
this.attempt = attemptNumber;
for (ApiTracer child : children) {
- child.attemptStarted(attemptNumber);
+ child.attemptStarted(request, attemptNumber);
}
}
@@ -185,4 +190,18 @@ public void batchRequestThrottled(long throttledTimeMs) {
tracer.batchRequestThrottled(throttledTimeMs);
}
}
+
+ @Override
+ public void setLocations(String zone, String cluster) {
+ for (BigtableTracer tracer : bigtableTracers) {
+ tracer.setLocations(zone, cluster);
+ }
+ }
+
+ @Override
+ public void onRequest() {
+ for (BigtableTracer tracer : bigtableTracers) {
+ tracer.onRequest();
+ }
+ }
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java
index f28b07c0cb..d27941bb71 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java
@@ -118,7 +118,9 @@ private void recordOperationCompletion(@Nullable Throwable throwable) {
TagContextBuilder tagCtx =
newTagCtxBuilder()
- .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable));
+ .putLocal(
+ RpcMeasureConstants.BIGTABLE_STATUS,
+ TagValue.create(Util.extractStatus(throwable)));
measures.record(tagCtx.build());
}
@@ -171,7 +173,9 @@ private void recordAttemptCompletion(@Nullable Throwable throwable) {
TagContextBuilder tagCtx =
newTagCtxBuilder()
- .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable));
+ .putLocal(
+ RpcMeasureConstants.BIGTABLE_STATUS,
+ TagValue.create(Util.extractStatus(throwable)));
measures.record(tagCtx.build());
}
@@ -222,7 +226,8 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
}
measures.record(
newTagCtxBuilder()
- .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable))
+ .putLocal(
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create(Util.extractStatus(throwable)))
.build());
}
@@ -248,4 +253,14 @@ private TagContextBuilder newTagCtxBuilder() {
return tagCtx;
}
+
+ @Override
+ public void setLocations(String zone, String cluster) {
+ // noop
+ }
+
+ @Override
+ public void onRequest() {
+ // noop
+ }
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java
index 00995b717a..0440029027 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java
@@ -15,10 +15,19 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;
+import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StatusCode.Code;
+import com.google.bigtable.v2.CheckAndMutateRowRequest;
+import com.google.bigtable.v2.MutateRowRequest;
+import com.google.bigtable.v2.MutateRowsRequest;
+import com.google.bigtable.v2.ReadModifyWriteRowRequest;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.SampleRowKeysRequest;
+import com.google.bigtable.v2.TableName;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.grpc.Metadata;
import io.grpc.Status;
@@ -38,7 +47,8 @@
import javax.annotation.Nullable;
/** Utilities to help integrating with OpenCensus. */
-class Util {
+@InternalApi("For internal use only")
+public class Util {
static final Metadata.Key ATTEMPT_HEADER_KEY =
Metadata.Key.of("bigtable-attempt", Metadata.ASCII_STRING_MARSHALLER);
static final Metadata.Key ATTEMPT_EPOCH_KEY =
@@ -48,14 +58,14 @@ class Util {
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);
private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?\\d+)");
- private static final TagValue OK_STATUS = TagValue.create(StatusCode.Code.OK.toString());
+ static final String TRAILER_KEY = "x-goog-ext-425905942-bin";
- /** Convert an exception into a value that can be used as an OpenCensus tag value. */
- static TagValue extractStatus(@Nullable Throwable error) {
+ /** Convert an exception into a value that can be used to create an OpenCensus tag value. */
+ static String extractStatus(@Nullable Throwable error) {
final String statusString;
if (error == null) {
- return OK_STATUS;
+ return StatusCode.Code.OK.toString();
} else if (error instanceof CancellationException) {
statusString = Status.Code.CANCELLED.toString();
} else if (error instanceof ApiException) {
@@ -68,14 +78,14 @@ static TagValue extractStatus(@Nullable Throwable error) {
statusString = Code.UNKNOWN.toString();
}
- return TagValue.create(statusString);
+ return statusString;
}
/**
* Await the result of the future and convert it into a value that can be used as an OpenCensus
* tag value.
*/
- static TagValue extractStatus(Future> future) {
+ static TagValue extractStatusFromFuture(Future> future) {
Throwable error = null;
try {
@@ -88,7 +98,25 @@ static TagValue extractStatus(Future> future) {
} catch (RuntimeException e) {
error = e;
}
- return extractStatus(error);
+ return TagValue.create(extractStatus(error));
+ }
+
+ static String extractTableId(Object request) {
+ String tableName = null;
+ if (request instanceof ReadRowsRequest) {
+ tableName = ((ReadRowsRequest) request).getTableName();
+ } else if (request instanceof MutateRowsRequest) {
+ tableName = ((MutateRowsRequest) request).getTableName();
+ } else if (request instanceof MutateRowRequest) {
+ tableName = ((MutateRowRequest) request).getTableName();
+ } else if (request instanceof SampleRowKeysRequest) {
+ tableName = ((SampleRowKeysRequest) request).getTableName();
+ } else if (request instanceof CheckAndMutateRowRequest) {
+ tableName = ((CheckAndMutateRowRequest) request).getTableName();
+ } else if (request instanceof ReadModifyWriteRowRequest) {
+ tableName = ((ReadModifyWriteRowRequest) request).getTableName();
+ }
+ return !Strings.isNullOrEmpty(tableName) ? TableName.parse(tableName).getTable() : "undefined";
}
/**
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java
new file mode 100644
index 0000000000..3a6f64a124
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java
@@ -0,0 +1,341 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.api.client.util.Lists;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.rpc.ClientContext;
+import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.StreamController;
+import com.google.bigtable.v2.BigtableGrpc;
+import com.google.bigtable.v2.MutateRowRequest;
+import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.ReadRowsResponse;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.FakeServiceHelper;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.cloud.bigtable.data.v2.models.RowMutation;
+import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
+import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
+import com.google.cloud.bigtable.stats.StatsRecorderWrapper;
+import com.google.cloud.bigtable.stats.StatsWrapper;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.StringValue;
+import io.grpc.ForwardingServerCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.threeten.bp.Duration;
+
+public class BuiltinMetricsTracerTest {
+ private static final String PROJECT_ID = "fake-project";
+ private static final String INSTANCE_ID = "fake-instance";
+ private static final String APP_PROFILE_ID = "default";
+ private static final String TABLE_ID = "fake-table";
+ private static final String UNDEFINED = "undefined";
+ private static final long FAKE_SERVER_TIMING = 50;
+ private static final long SERVER_LATENCY = 500;
+
+ private final AtomicInteger rpcCount = new AtomicInteger(0);
+
+ private static final ReadRowsResponse READ_ROWS_RESPONSE_1 =
+ ReadRowsResponse.newBuilder()
+ .addChunks(
+ ReadRowsResponse.CellChunk.newBuilder()
+ .setRowKey(ByteString.copyFromUtf8("fake-key-1"))
+ .setFamilyName(StringValue.of("cf"))
+ .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")))
+ .setTimestampMicros(1_000)
+ .setValue(ByteString.copyFromUtf8("value"))
+ .setCommitRow(true))
+ .build();
+ private static final ReadRowsResponse READ_ROWS_RESPONSE_2 =
+ ReadRowsResponse.newBuilder()
+ .addChunks(
+ ReadRowsResponse.CellChunk.newBuilder()
+ .setRowKey(ByteString.copyFromUtf8("fake-key-2"))
+ .setFamilyName(StringValue.of("cf"))
+ .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")))
+ .setTimestampMicros(1_000)
+ .setValue(ByteString.copyFromUtf8("value"))
+ .setCommitRow(true))
+ .build();
+ private static final ReadRowsResponse READ_ROWS_RESPONSE_3 =
+ ReadRowsResponse.newBuilder()
+ .addChunks(
+ ReadRowsResponse.CellChunk.newBuilder()
+ .setRowKey(ByteString.copyFromUtf8("fake-key-3"))
+ .setFamilyName(StringValue.of("cf"))
+ .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")))
+ .setTimestampMicros(1_000)
+ .setValue(ByteString.copyFromUtf8("value"))
+ .setCommitRow(true))
+ .build();
+
+ @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ private FakeServiceHelper serviceHelper;
+
+ private BigtableGrpc.BigtableImplBase mockService;
+
+ private EnhancedBigtableStub stub;
+
+ private StatsRecorderWrapper statsRecorderWrapper;
+ private ArgumentCaptor longValue;
+ private ArgumentCaptor intValue;
+ private ArgumentCaptor status;
+ private ArgumentCaptor tableId;
+ private ArgumentCaptor zone;
+ private ArgumentCaptor cluster;
+
+ private Stopwatch serverRetryDelayStopwatch;
+ private AtomicLong serverTotalRetryDelay;
+
+ @Before
+ public void setUp() throws Exception {
+ mockService = new FakeService();
+
+ serverRetryDelayStopwatch = Stopwatch.createUnstarted();
+ serverTotalRetryDelay = new AtomicLong(0);
+
+ // Add an interceptor to send location information in the trailers and add server-timing in
+ // headers
+ ServerInterceptor trailersInterceptor =
+ new ServerInterceptor() {
+ private AtomicInteger count = new AtomicInteger(0);
+
+ @Override
+ public ServerCall.Listener interceptCall(
+ ServerCall serverCall,
+ Metadata metadata,
+ ServerCallHandler serverCallHandler) {
+ return serverCallHandler.startCall(
+ new ForwardingServerCall.SimpleForwardingServerCall(serverCall) {
+ @Override
+ public void sendHeaders(Metadata headers) {
+ headers.put(
+ Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER),
+ String.format("gfet4t7; dur=%d", FAKE_SERVER_TIMING));
+ super.sendHeaders(headers);
+ }
+
+ @Override
+ public void close(Status status, Metadata trailers) {
+ super.close(status, trailers);
+ }
+ },
+ metadata);
+ }
+ };
+
+ serviceHelper = new FakeServiceHelper(trailersInterceptor, mockService);
+
+ statsRecorderWrapper = Mockito.mock(StatsRecorderWrapper.class);
+
+ BigtableDataSettings settings =
+ BigtableDataSettings.newBuilderForEmulator(serviceHelper.getPort())
+ .setProjectId(PROJECT_ID)
+ .setInstanceId(INSTANCE_ID)
+ .setAppProfileId(APP_PROFILE_ID)
+ .build();
+ EnhancedBigtableStubSettings.Builder stubSettingsBuilder =
+ settings.getStubSettings().toBuilder();
+ stubSettingsBuilder
+ .mutateRowSettings()
+ .retrySettings()
+ .setInitialRetryDelay(Duration.ofMillis(200));
+ stubSettingsBuilder.setTracerFactory(
+ BuiltinMetricsTracerFactory.create(
+ StatsWrapper.get(), ImmutableMap.of(), statsRecorderWrapper));
+
+ EnhancedBigtableStubSettings stubSettings = stubSettingsBuilder.build();
+ stub = new EnhancedBigtableStub(stubSettings, ClientContext.create(stubSettings));
+
+ serviceHelper.start();
+
+ longValue = ArgumentCaptor.forClass(long.class);
+ intValue = ArgumentCaptor.forClass(int.class);
+ status = ArgumentCaptor.forClass(String.class);
+ tableId = ArgumentCaptor.forClass(String.class);
+ zone = ArgumentCaptor.forClass(String.class);
+ cluster = ArgumentCaptor.forClass(String.class);
+ }
+
+ @After
+ public void tearDown() {
+ stub.close();
+ serviceHelper.shutdown();
+ }
+
+ @Test
+ public void testOperationLatencies() {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)).iterator());
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ verify(statsRecorderWrapper).putOperationLatencies(longValue.capture());
+
+ assertThat(longValue.getValue()).isIn(Range.closed(SERVER_LATENCY, elapsed));
+ }
+
+ @Test
+ public void testGfeMetrics() {
+ Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)));
+
+ verify(statsRecorderWrapper).putGfeLatencies(longValue.capture());
+ assertThat(longValue.getValue()).isEqualTo(FAKE_SERVER_TIMING);
+
+ verify(statsRecorderWrapper).putGfeMissingHeaders(longValue.capture());
+ assertThat(longValue.getValue()).isEqualTo(0);
+ }
+
+ @Test
+ public void testReadRowsApplicationLatency() throws Exception {
+ final long applicationLatency = 1000;
+ final SettableApiFuture future = SettableApiFuture.create();
+ final AtomicInteger counter = new AtomicInteger(0);
+ // We want to measure how long application waited before requesting another message after
+ // the previous message is returned from the server. Using ResponseObserver here so that the
+ // flow will be
+ // onResponse() -> sleep -> onRequest() (for the next message) which is exactly what we want to
+ // measure for
+ // application latency.
+ // If we do readRowsCallable().call(Query.create(TABLE_ID)).iterator() and iterate through the
+ // iterator and sleep in
+ // between responses, when the call started, the client will pre-fetch the first response, which
+ // won't be counted
+ // in application latency. So the test will be flaky and hard to debug.
+ stub.readRowsCallable()
+ .call(
+ Query.create(TABLE_ID),
+ new ResponseObserver() {
+ @Override
+ public void onStart(StreamController streamController) {}
+
+ @Override
+ public void onResponse(Row row) {
+ try {
+ counter.incrementAndGet();
+ Thread.sleep(applicationLatency);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ future.setException(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ future.set(null);
+ }
+ });
+ future.get();
+
+ verify(statsRecorderWrapper).putApplicationLatencies(longValue.capture());
+
+ assertThat(counter.get()).isGreaterThan(0);
+ assertThat(longValue.getValue()).isAtLeast(applicationLatency * counter.get());
+ assertThat(longValue.getValue()).isLessThan(applicationLatency * (counter.get() + 1));
+ }
+
+ @Test
+ public void testRetryCount() {
+ stub.mutateRowCallable()
+ .call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value"));
+
+ verify(statsRecorderWrapper).putRetryCount(intValue.capture());
+
+ assertThat(intValue.getValue()).isEqualTo(3);
+ }
+
+ @Test
+ public void testMutateRowAttempts() {
+ stub.mutateRowCallable()
+ .call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value"));
+
+ verify(statsRecorderWrapper, times(4))
+ .record(status.capture(), tableId.capture(), zone.capture(), cluster.capture());
+ assertThat(zone.getAllValues().get(0)).isEqualTo(UNDEFINED);
+ assertThat(zone.getAllValues().get(1)).isEqualTo(UNDEFINED);
+ assertThat(zone.getAllValues().get(2)).isEqualTo(UNDEFINED);
+ assertThat(cluster.getAllValues().get(0)).isEqualTo(UNDEFINED);
+ assertThat(cluster.getAllValues().get(1)).isEqualTo(UNDEFINED);
+ assertThat(cluster.getAllValues().get(2)).isEqualTo(UNDEFINED);
+ assertThat(status.getAllValues().get(0)).isEqualTo("UNAVAILABLE");
+ assertThat(status.getAllValues().get(1)).isEqualTo("UNAVAILABLE");
+ assertThat(status.getAllValues().get(2)).isEqualTo("OK");
+ }
+
+ private class FakeService extends BigtableGrpc.BigtableImplBase {
+
+ @Override
+ public void readRows(
+ ReadRowsRequest request, StreamObserver responseObserver) {
+ try {
+ Thread.sleep(SERVER_LATENCY);
+ } catch (InterruptedException e) {
+ }
+ responseObserver.onNext(READ_ROWS_RESPONSE_1);
+ responseObserver.onNext(READ_ROWS_RESPONSE_2);
+ responseObserver.onNext(READ_ROWS_RESPONSE_3);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void mutateRow(
+ MutateRowRequest request, StreamObserver responseObserver) {
+ if (serverRetryDelayStopwatch.isRunning()) {
+ serverTotalRetryDelay.addAndGet(serverRetryDelayStopwatch.elapsed(TimeUnit.MILLISECONDS));
+ serverRetryDelayStopwatch.reset();
+ }
+ if (rpcCount.get() < 2) {
+ responseObserver.onError(new StatusRuntimeException(Status.UNAVAILABLE));
+ rpcCount.getAndIncrement();
+ serverRetryDelayStopwatch.start();
+ return;
+ }
+ responseObserver.onNext(MutateRowResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ }
+ }
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java
index 69a741d0e3..0de14636c6 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java
@@ -23,6 +23,7 @@
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracer.Scope;
+import com.google.bigtable.v2.ReadRowsRequest;
import com.google.cloud.bigtable.misc_utilities.MethodComparator;
import com.google.common.collect.ImmutableList;
import io.grpc.Status;
@@ -118,11 +119,12 @@ public void testConnectionSelected() {
@Test
public void testAttemptStarted() {
- compositeTracer.attemptStarted(3);
- verify(child1, times(1)).attemptStarted(3);
- verify(child2, times(1)).attemptStarted(3);
- verify(child3, times(1)).attemptStarted(3);
- verify(child4, times(1)).attemptStarted(3);
+ ReadRowsRequest request = ReadRowsRequest.getDefaultInstance();
+ compositeTracer.attemptStarted(request, 3);
+ verify(child1, times(1)).attemptStarted(request, 3);
+ verify(child2, times(1)).attemptStarted(request, 3);
+ verify(child3, times(1)).attemptStarted(request, 3);
+ verify(child4, times(1)).attemptStarted(request, 3);
}
@Test
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java
index efef3b67d2..3c0fb4e617 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java
@@ -30,13 +30,13 @@
public class UtilTest {
@Test
public void testOk() {
- TagValue tagValue = Util.extractStatus((Throwable) null);
+ TagValue tagValue = TagValue.create(Util.extractStatus((Throwable) null));
assertThat(tagValue.asString()).isEqualTo("OK");
}
@Test
public void testOkFuture() {
- TagValue tagValue = Util.extractStatus(Futures.immediateFuture(null));
+ TagValue tagValue = Util.extractStatusFromFuture(Futures.immediateFuture(null));
assertThat(tagValue.asString()).isEqualTo("OK");
}
@@ -45,7 +45,7 @@ public void testError() {
DeadlineExceededException error =
new DeadlineExceededException(
"Deadline exceeded", null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true);
- TagValue tagValue = Util.extractStatus(error);
+ TagValue tagValue = TagValue.create(Util.extractStatus(error));
assertThat(tagValue.asString()).isEqualTo("DEADLINE_EXCEEDED");
}
@@ -54,13 +54,13 @@ public void testErrorFuture() {
DeadlineExceededException error =
new DeadlineExceededException(
"Deadline exceeded", null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true);
- TagValue tagValue = Util.extractStatus(Futures.immediateFailedFuture(error));
+ TagValue tagValue = Util.extractStatusFromFuture(Futures.immediateFailedFuture(error));
assertThat(tagValue.asString()).isEqualTo("DEADLINE_EXCEEDED");
}
@Test
public void testCancelledFuture() {
- TagValue tagValue = Util.extractStatus(Futures.immediateCancelledFuture());
+ TagValue tagValue = Util.extractStatusFromFuture(Futures.immediateCancelledFuture());
assertThat(tagValue.asString()).isEqualTo("CANCELLED");
}
}