diff --git a/google-cloud-bigtable-stats/clirr-ignored-differences.xml b/google-cloud-bigtable-stats/clirr-ignored-differences.xml
index ff42f58da4..a920751495 100644
--- a/google-cloud-bigtable-stats/clirr-ignored-differences.xml
+++ b/google-cloud-bigtable-stats/clirr-ignored-differences.xml
@@ -13,4 +13,10 @@
com/google/cloud/bigtable/stats/StatsRecorderWrapper
void record(java.lang.String, java.lang.String, java.lang.String, java.lang.String)
+
+
+ 7002
+ com/google/cloud/bigtable/stats/StatsRecorderWrapper
+ void putBatchRequestThrottled(long)
+
diff --git a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java
index eac556502d..88eab077c3 100644
--- a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java
+++ b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java
@@ -115,8 +115,8 @@ public void putGfeMissingHeaders(long connectivityErrors) {
attemptMeasureMap.put(BuiltinMeasureConstants.CONNECTIVITY_ERROR_COUNT, connectivityErrors);
}
- public void putBatchRequestThrottled(long throttledTimeMs) {
- operationMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, throttledTimeMs);
+ public void putClientBlockingLatencies(long clientBlockingLatency) {
+ operationMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, clientBlockingLatency);
}
private TagContextBuilder newTagContextBuilder(String tableId, String zone, String cluster) {
diff --git a/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java b/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java
index a878fc96da..b68e4f1a1b 100644
--- a/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java
+++ b/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java
@@ -91,7 +91,7 @@ public void testStreamingOperation() throws InterruptedException {
recorderWrapper.putGfeLatencies(serverLatency);
recorderWrapper.putGfeMissingHeaders(connectivityErrorCount);
recorderWrapper.putFirstResponseLatencies(firstResponseLatency);
- recorderWrapper.putBatchRequestThrottled(throttlingLatency);
+ recorderWrapper.putClientBlockingLatencies(throttlingLatency);
recorderWrapper.recordOperation("OK", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordAttempt("OK", TABLE_ID, ZONE, CLUSTER);
@@ -290,7 +290,7 @@ public void testUnaryOperations() throws InterruptedException {
recorderWrapper.putGfeLatencies(serverLatency);
recorderWrapper.putGfeMissingHeaders(connectivityErrorCount);
recorderWrapper.putFirstResponseLatencies(firstResponseLatency);
- recorderWrapper.putBatchRequestThrottled(throttlingLatency);
+ recorderWrapper.putClientBlockingLatencies(throttlingLatency);
recorderWrapper.recordOperation("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordAttempt("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
index 57c6d3337c..820dc7c652 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
@@ -87,6 +87,7 @@
import com.google.cloud.bigtable.data.v2.stub.changestream.GenerateInitialChangeStreamPartitionsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamUserCallable;
+import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerBatchedUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
@@ -509,7 +510,7 @@ private UnaryCallable> createBulkReadRowsCallable(
new TracedBatcherUnaryCallable<>(readRowsUserCallable.all());
UnaryCallable> withBigtableTracer =
- new BigtableTracerUnaryCallable<>(tracedBatcher);
+ new BigtableTracerBatchedUnaryCallable<>(tracedBatcher);
UnaryCallable> traced =
new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), span);
@@ -641,7 +642,7 @@ private UnaryCallable createBulkMutateRowsCallable() {
new TracedBatcherUnaryCallable<>(userFacing);
UnaryCallable withBigtableTracer =
- new BigtableTracerUnaryCallable<>(tracedBatcherUnaryCallable);
+ new BigtableTracerBatchedUnaryCallable<>(tracedBatcherUnaryCallable);
UnaryCallable traced =
new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), spanName);
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java
new file mode 100644
index 0000000000..1cda49934c
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.common.base.Stopwatch;
+import io.grpc.Attributes;
+import io.grpc.ClientStreamTracer;
+import io.grpc.Metadata;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Records the time a request is enqueued in a grpc channel queue. This a bridge between gRPC stream
+ * tracing and Bigtable tracing. Its primary purpose is to measure the transition time between
+ * asking gRPC to start an RPC and gRPC actually serializing that RPC.
+ */
+class BigtableGrpcStreamTracer extends ClientStreamTracer {
+
+ private final Stopwatch stopwatch = Stopwatch.createUnstarted();
+ private final BigtableTracer tracer;
+
+ public BigtableGrpcStreamTracer(BigtableTracer tracer) {
+ this.tracer = tracer;
+ }
+
+ @Override
+ public void streamCreated(Attributes transportAttrs, Metadata headers) {
+ stopwatch.start();
+ }
+
+ @Override
+ public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
+ tracer.grpcChannelQueuedLatencies(stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ }
+
+ static class Factory extends ClientStreamTracer.Factory {
+
+ private final BigtableTracer tracer;
+
+ Factory(BigtableTracer tracer) {
+ this.tracer = tracer;
+ }
+
+ @Override
+ public ClientStreamTracer newClientStreamTracer(
+ ClientStreamTracer.StreamInfo info, Metadata headers) {
+ return new BigtableGrpcStreamTracer(tracer);
+ }
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
index 2640cc1ced..3445514f7b 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
@@ -82,4 +82,8 @@ public void batchRequestThrottled(long throttledTimeMs) {
public void setLocations(String zone, String cluster) {
// noop
}
+
+ public void grpcChannelQueuedLatencies(long queuedTimeMs) {
+ // noop
+ }
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedUnaryCallable.java
new file mode 100644
index 0000000000..06722aaea5
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedUnaryCallable.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.InternalApi;
+import com.google.api.gax.grpc.GrpcResponseMetadata;
+import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.common.util.concurrent.MoreExecutors;
+import javax.annotation.Nonnull;
+
+/**
+ * This callable will do everything described in {@link BigtableTracerUnaryCallable} except that it
+ * won't inject a {@link BigtableGrpcStreamTracer}. For batching calls, we only want to calculate
+ * the total time client is blocked because of flow control.
+ */
+@InternalApi
+public class BigtableTracerBatchedUnaryCallable
+ extends BigtableTracerUnaryCallable {
+
+ private UnaryCallable innerCallable;
+
+ public BigtableTracerBatchedUnaryCallable(
+ @Nonnull UnaryCallable innerCallable) {
+ super(innerCallable);
+ this.innerCallable = innerCallable;
+ }
+
+ @Override
+ public ApiFuture futureCall(RequestT request, ApiCallContext context) {
+ final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
+ BigtableTracerUnaryCallback callback =
+ new BigtableTracerUnaryCallback(
+ (BigtableTracer) context.getTracer(), responseMetadata);
+ ApiFuture future =
+ innerCallable.futureCall(request, responseMetadata.addHandlers(context));
+ ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor());
+ return future;
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java
index 17c968c60f..167cd0dc2e 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java
@@ -37,6 +37,8 @@
* -This class will also access trailers from {@link GrpcResponseMetadata} to record zone and
* cluster ids.
* -Call {@link BigtableTracer#onRequest(int)} to record the request events in a stream.
+ * -This class will also inject a {@link BigtableGrpcStreamTracer} that'll record the time an
+ * RPC spent in a grpc channel queue.
* This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@@ -60,7 +62,11 @@ public void call(
BigtableTracerResponseObserver innerObserver =
new BigtableTracerResponseObserver<>(
responseObserver, (BigtableTracer) context.getTracer(), responseMetadata);
- innerCallable.call(request, innerObserver, responseMetadata.addHandlers(context));
+ innerCallable.call(
+ request,
+ innerObserver,
+ Util.injectBigtableStreamTracer(
+ context, responseMetadata, (BigtableTracer) context.getTracer()));
} else {
innerCallable.call(request, responseObserver, context);
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java
index 4b73a34797..e5ec7b806b 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java
@@ -35,6 +35,8 @@
* the gfe_header_missing_counter in this case.
* -This class will also access trailers from {@link GrpcResponseMetadata} to record zone and
* cluster ids.
+ * -This class will also inject a {@link BigtableGrpcStreamTracer} that'll record the time an
+ * RPC spent in a grpc channel queue.
* This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@@ -49,14 +51,18 @@ public BigtableTracerUnaryCallable(@Nonnull UnaryCallable i
}
@Override
- public ApiFuture futureCall(RequestT request, ApiCallContext context) {
+ public ApiFuture futureCall(RequestT request, ApiCallContext context) {
// tracer should always be an instance of BigtableTracer
if (context.getTracer() instanceof BigtableTracer) {
final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
- final ApiCallContext contextWithResponseMetadata = responseMetadata.addHandlers(context);
- BigtableTracerUnaryCallback callback =
- new BigtableTracerUnaryCallback((BigtableTracer) context.getTracer(), responseMetadata);
- ApiFuture future = innerCallable.futureCall(request, contextWithResponseMetadata);
+ BigtableTracerUnaryCallback callback =
+ new BigtableTracerUnaryCallback(
+ (BigtableTracer) context.getTracer(), responseMetadata);
+ ApiFuture future =
+ innerCallable.futureCall(
+ request,
+ Util.injectBigtableStreamTracer(
+ context, responseMetadata, (BigtableTracer) context.getTracer()));
ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor());
return future;
} else {
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java
index e592d0038f..a8b8148d3e 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java
@@ -71,6 +71,8 @@ class BuiltinMetricsTracer extends BigtableTracer {
private String zone = "global";
private String cluster = "unspecified";
+ private AtomicLong totalClientBlockingTime = new AtomicLong(0);
+
@VisibleForTesting
BuiltinMetricsTracer(
OperationType operationType, SpanName spanName, StatsRecorderWrapper recorder) {
@@ -219,7 +221,12 @@ public void setLocations(String zone, String cluster) {
@Override
public void batchRequestThrottled(long throttledTimeMs) {
- recorder.putBatchRequestThrottled(throttledTimeMs);
+ totalClientBlockingTime.addAndGet(throttledTimeMs);
+ }
+
+ @Override
+ public void grpcChannelQueuedLatencies(long queuedTimeMs) {
+ totalClientBlockingTime.addAndGet(queuedTimeMs);
}
@Override
@@ -266,6 +273,8 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
}
}
+ recorder.putClientBlockingLatencies(totalClientBlockingTime.get());
+
// Patch the status until it's fixed in gax. When an attempt failed,
// it'll throw a ServerStreamingAttemptException. Unwrap the exception
// so it could get processed by extractStatus
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java
index 271782c2f6..774c6d9f22 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java
@@ -218,4 +218,11 @@ public void afterResponse(long applicationLatency) {
tracer.afterResponse(applicationLatency);
}
}
+
+ @Override
+ public void grpcChannelQueuedLatencies(long queuedTimeMs) {
+ for (BigtableTracer tracer : bigtableTracers) {
+ tracer.grpcChannelQueuedLatencies(queuedTimeMs);
+ }
+ }
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java
index 5b045f15ef..8baf6a15f4 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java
@@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;
import com.google.api.core.InternalApi;
+import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
@@ -32,6 +33,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.InvalidProtocolBufferException;
+import io.grpc.CallOptions;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
@@ -197,4 +199,23 @@ static void recordMetricsFromMetadata(
// Record gfe metrics
tracer.recordGfeMetadata(latency, throwable);
}
+
+ /**
+ * This method bridges gRPC stream tracing to bigtable tracing by adding a {@link
+ * io.grpc.ClientStreamTracer} to the callContext.
+ */
+ static GrpcCallContext injectBigtableStreamTracer(
+ ApiCallContext context, GrpcResponseMetadata responseMetadata, BigtableTracer tracer) {
+ if (context instanceof GrpcCallContext) {
+ GrpcCallContext callContext = (GrpcCallContext) context;
+ CallOptions callOptions = callContext.getCallOptions();
+ return responseMetadata.addHandlers(
+ callContext.withCallOptions(
+ callOptions.withStreamTracerFactory(new BigtableGrpcStreamTracer.Factory(tracer))));
+ } else {
+ // context should always be an instance of GrpcCallContext. If not throw an exception
+ // so we can see what class context is.
+ throw new RuntimeException("Unexpected context class: " + context.getClass().getName());
+ }
+ }
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java
index 4d7903dd27..3bc283a7f7 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java
@@ -24,10 +24,12 @@
import static org.mockito.Mockito.when;
import com.google.api.client.util.Lists;
+import com.google.api.core.ApiFunction;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
+import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.NotFoundException;
import com.google.api.gax.rpc.ResponseObserver;
@@ -55,8 +57,15 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.StringValue;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingServerCall;
+import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
@@ -101,6 +110,8 @@ public class BuiltinMetricsTracerTest {
private static final long SERVER_LATENCY = 100;
private static final long APPLICATION_LATENCY = 200;
+ private static final long CHANNEL_BLOCKING_LATENCY = 75;
+
@Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
private final FakeService fakeService = new FakeService();
@@ -148,6 +159,28 @@ public void sendHeaders(Metadata headers) {
}
};
+ ClientInterceptor clientInterceptor =
+ new ClientInterceptor() {
+ @Override
+ public ClientCall interceptCall(
+ MethodDescriptor methodDescriptor,
+ CallOptions callOptions,
+ Channel channel) {
+ return new ForwardingClientCall.SimpleForwardingClientCall(
+ channel.newCall(methodDescriptor, callOptions)) {
+ @Override
+ public void sendMessage(ReqT message) {
+ try {
+ Thread.sleep(CHANNEL_BLOCKING_LATENCY);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ super.sendMessage(message);
+ }
+ };
+ }
+ };
+
server = FakeServiceBuilder.create(fakeService).intercept(trailersInterceptor).start();
BigtableDataSettings settings =
@@ -180,6 +213,23 @@ public void sendHeaders(Metadata headers) {
.build());
stubSettingsBuilder.setTracerFactory(mockFactory);
+ InstantiatingGrpcChannelProvider.Builder channelProvider =
+ ((InstantiatingGrpcChannelProvider) stubSettingsBuilder.getTransportChannelProvider())
+ .toBuilder();
+
+ @SuppressWarnings("rawtypes")
+ final ApiFunction oldConfigurator =
+ channelProvider.getChannelConfigurator();
+
+ channelProvider.setChannelConfigurator(
+ (builder) -> {
+ if (oldConfigurator != null) {
+ builder = oldConfigurator.apply(builder);
+ }
+ return builder.intercept(clientInterceptor);
+ });
+ stubSettingsBuilder.setTransportChannelProvider(channelProvider.build());
+
EnhancedBigtableStubSettings stubSettings = stubSettingsBuilder.build();
stub = new EnhancedBigtableStub(stubSettings, ClientContext.create(stubSettings));
}
@@ -353,7 +403,7 @@ public void testRetryCount() {
invocationOnMock ->
new BuiltinMetricsTracer(
OperationType.ServerStreaming,
- SpanName.of("Bigtable", "ReadRows"),
+ SpanName.of("Bigtable", "MutateRow"),
statsRecorderWrapper));
ArgumentCaptor retryCount = ArgumentCaptor.forClass(Integer.class);
@@ -418,7 +468,7 @@ public void testReadRowsAttemptsTagValues() {
}
@Test
- public void testClientBlockingLatencies() throws InterruptedException {
+ public void testBatchBlockingLatencies() throws InterruptedException {
when(mockFactory.newTracer(any(), any(), any()))
.thenReturn(
new BuiltinMetricsTracer(
@@ -430,8 +480,8 @@ public void testClientBlockingLatencies() throws InterruptedException {
int expectedNumRequests = 6 / batchElementCount;
ArgumentCaptor throttledTime = ArgumentCaptor.forClass(Long.class);
- verify(statsRecorderWrapper, times(expectedNumRequests))
- .putBatchRequestThrottled(throttledTime.capture());
+ verify(statsRecorderWrapper, timeout(1000).times(expectedNumRequests))
+ .putClientBlockingLatencies(throttledTime.capture());
// Adding the first 2 elements should not get throttled since the batch is empty
assertThat(throttledTime.getAllValues().get(0)).isEqualTo(0);
@@ -442,6 +492,42 @@ public void testClientBlockingLatencies() throws InterruptedException {
}
}
+ @Test
+ public void testQueuedOnChannelServerStreamLatencies() throws InterruptedException {
+ when(mockFactory.newTracer(any(), any(), any()))
+ .thenReturn(
+ new BuiltinMetricsTracer(
+ OperationType.ServerStreaming,
+ SpanName.of("Bigtable", "ReadRows"),
+ statsRecorderWrapper));
+
+ stub.readRowsCallable().all().call(Query.create(TABLE_ID));
+
+ ArgumentCaptor blockedTime = ArgumentCaptor.forClass(Long.class);
+
+ verify(statsRecorderWrapper, timeout(1000).times(fakeService.attemptCounter.get()))
+ .putClientBlockingLatencies(blockedTime.capture());
+
+ assertThat(blockedTime.getAllValues().get(1)).isAtLeast(CHANNEL_BLOCKING_LATENCY);
+ }
+
+ @Test
+ public void testQueuedOnChannelUnaryLatencies() throws InterruptedException {
+ when(mockFactory.newTracer(any(), any(), any()))
+ .thenReturn(
+ new BuiltinMetricsTracer(
+ OperationType.Unary, SpanName.of("Bigtable", "MutateRow"), statsRecorderWrapper));
+ stub.mutateRowCallable().call(RowMutation.create(TABLE_ID, "a-key").setCell("f", "q", "v"));
+
+ ArgumentCaptor blockedTime = ArgumentCaptor.forClass(Long.class);
+
+ verify(statsRecorderWrapper, timeout(1000).times(fakeService.attemptCounter.get()))
+ .putClientBlockingLatencies(blockedTime.capture());
+
+ assertThat(blockedTime.getAllValues().get(1)).isAtLeast(CHANNEL_BLOCKING_LATENCY);
+ assertThat(blockedTime.getAllValues().get(2)).isAtLeast(CHANNEL_BLOCKING_LATENCY);
+ }
+
@Test
public void testPermanentFailure() {
when(mockFactory.newTracer(any(), any(), any()))
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java
index 0de14636c6..11dd0b5095 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java
@@ -251,4 +251,11 @@ public void testMethodsOverride() {
.comparingElementsUsing(MethodComparator.METHOD_CORRESPONDENCE)
.containsAtLeastElementsIn(baseMethods);
}
+
+ @Test
+ public void testRequestBlockedOnChannel() {
+ compositeTracer.grpcChannelQueuedLatencies(5L);
+ verify(child3, times(1)).grpcChannelQueuedLatencies(5L);
+ verify(child4, times(1)).grpcChannelQueuedLatencies(5L);
+ }
}