From e32b6330a6f081cf500e06e33197aa83234c8b17 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 20 Dec 2022 13:11:53 -0500 Subject: [PATCH 1/9] feat: track the latency a request is queued on the grpc channel --- .../bigtable/stats/StatsRecorderWrapper.java | 4 ++ .../stats/StatsRecorderWrapperTest.java | 2 + .../metrics/BigtableGrpcStreamTracer.java | 43 +++++++++++++++++++ .../data/v2/stub/metrics/BigtableTracer.java | 4 ++ .../BigtableTracerStreamingCallable.java | 7 ++- .../metrics/BigtableTracerUnaryCallable.java | 6 ++- .../v2/stub/metrics/BuiltinMetricsTracer.java | 6 +++ .../data/v2/stub/metrics/CompositeTracer.java | 7 +++ .../bigtable/data/v2/stub/metrics/Util.java | 16 +++++++ .../metrics/BigtableTracerCallableTest.java | 3 +- .../v2/stub/metrics/CompositeTracerTest.java | 7 +++ 11 files changed, 101 insertions(+), 4 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java diff --git a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java index eac556502d..989f1c9db7 100644 --- a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java +++ b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java @@ -119,6 +119,10 @@ public void putBatchRequestThrottled(long throttledTimeMs) { operationMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, throttledTimeMs); } + public void putRequestBlockedOnChannel(long blockedTime) { + attemptMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, blockedTime); + } + private TagContextBuilder newTagContextBuilder(String tableId, String zone, String cluster) { TagContextBuilder tagContextBuilder = tagger diff --git a/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java b/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java index a878fc96da..edb89f39cc 100644 --- a/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java +++ b/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java @@ -92,6 +92,7 @@ public void testStreamingOperation() throws InterruptedException { recorderWrapper.putGfeMissingHeaders(connectivityErrorCount); recorderWrapper.putFirstResponseLatencies(firstResponseLatency); recorderWrapper.putBatchRequestThrottled(throttlingLatency); + recorderWrapper.putRequestBlockedOnChannel(throttlingLatency); recorderWrapper.recordOperation("OK", TABLE_ID, ZONE, CLUSTER); recorderWrapper.recordAttempt("OK", TABLE_ID, ZONE, CLUSTER); @@ -291,6 +292,7 @@ public void testUnaryOperations() throws InterruptedException { recorderWrapper.putGfeMissingHeaders(connectivityErrorCount); recorderWrapper.putFirstResponseLatencies(firstResponseLatency); recorderWrapper.putBatchRequestThrottled(throttlingLatency); + recorderWrapper.putRequestBlockedOnChannel(throttlingLatency); recorderWrapper.recordOperation("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER); recorderWrapper.recordAttempt("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java new file mode 100644 index 0000000000..f179b4966b --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java @@ -0,0 +1,43 @@ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.common.base.Stopwatch; +import io.grpc.Attributes; +import io.grpc.ClientStreamTracer; +import io.grpc.Metadata; + +import java.util.concurrent.TimeUnit; + +/** Records the time a request is blocked on the grpc channel */ +class BigtableGrpcStreamTracer extends ClientStreamTracer { + + private final Stopwatch stopwatch = Stopwatch.createUnstarted(); + private final BigtableTracer tracer; + + public BigtableGrpcStreamTracer(BigtableTracer tracer) { + this.tracer = tracer; + } + + @Override + public void streamCreated(Attributes transportAttrs, Metadata headers) { + stopwatch.start(); + } + + @Override + public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + tracer.requestBlockedOnChannel(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } + + static class Factory extends ClientStreamTracer.Factory { + + private final BigtableTracer tracer; + + Factory(BigtableTracer tracer) { + this.tracer = tracer; + } + + @Override + public ClientStreamTracer newClientStreamTracer(ClientStreamTracer.StreamInfo info, Metadata headers) { + return new BigtableGrpcStreamTracer(tracer); + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java index 2640cc1ced..6b4c2b6251 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java @@ -82,4 +82,8 @@ public void batchRequestThrottled(long throttledTimeMs) { public void setLocations(String zone, String cluster) { // noop } + + public void requestBlockedOnChannel(long blockedTimeMs) { + // noop + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java index 17c968c60f..f7a8474aec 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java @@ -16,6 +16,7 @@ package com.google.cloud.bigtable.data.v2.stub.metrics; import com.google.api.core.InternalApi; +import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcResponseMetadata; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ResponseObserver; @@ -24,6 +25,9 @@ import com.google.cloud.bigtable.data.v2.stub.SafeResponseObserver; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import io.grpc.ClientStreamTracer; + +import java.util.List; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; @@ -60,7 +64,8 @@ public void call( BigtableTracerResponseObserver innerObserver = new BigtableTracerResponseObserver<>( responseObserver, (BigtableTracer) context.getTracer(), responseMetadata); - innerCallable.call(request, innerObserver, responseMetadata.addHandlers(context)); + innerCallable.call(request, innerObserver, + Util.addHandlerToCallContext(context, responseMetadata, (BigtableTracer) context.getTracer())); } else { innerCallable.call(request, responseObserver, context); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java index 4b73a34797..6ac347df39 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java @@ -19,6 +19,7 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.core.InternalApi; +import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcResponseMetadata; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.UnaryCallable; @@ -53,10 +54,11 @@ public ApiFuture futureCall(RequestT request, ApiCallContext context) { // tracer should always be an instance of BigtableTracer if (context.getTracer() instanceof BigtableTracer) { final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata(); - final ApiCallContext contextWithResponseMetadata = responseMetadata.addHandlers(context); BigtableTracerUnaryCallback callback = new BigtableTracerUnaryCallback((BigtableTracer) context.getTracer(), responseMetadata); - ApiFuture future = innerCallable.futureCall(request, contextWithResponseMetadata); + ApiFuture future = + innerCallable.futureCall(request, + Util.addHandlerToCallContext(context, responseMetadata, (BigtableTracer) context.getTracer())); ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor()); return future; } else { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index d1b6a4b53e..c1c9325e89 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; + import org.threeten.bp.Duration; /** @@ -225,6 +226,11 @@ public void batchRequestThrottled(long throttledTimeMs) { recorder.putBatchRequestThrottled(throttledTimeMs); } + @Override + public void requestBlockedOnChannel(long blockedTimeMs) { + recorder.putRequestBlockedOnChannel(blockedTimeMs); + } + @Override public void disableFlowControl() { flowControlIsDisabled = true; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java index 271782c2f6..b05d3832b3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java @@ -218,4 +218,11 @@ public void afterResponse(long applicationLatency) { tracer.afterResponse(applicationLatency); } } + + @Override + public void requestBlockedOnChannel(long blockedTimeMs) { + for (BigtableTracer tracer : bigtableTracers) { + tracer.requestBlockedOnChannel(blockedTimeMs); + } + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java index 5b045f15ef..3aa6d456c3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java @@ -16,6 +16,7 @@ package com.google.cloud.bigtable.data.v2.stub.metrics; import com.google.api.core.InternalApi; +import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcResponseMetadata; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ApiException; @@ -32,6 +33,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.protobuf.InvalidProtocolBufferException; +import io.grpc.CallOptions; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusException; @@ -197,4 +199,18 @@ static void recordMetricsFromMetadata( // Record gfe metrics tracer.recordGfeMetadata(latency, throwable); } + + static GrpcCallContext addHandlerToCallContext( + ApiCallContext context, GrpcResponseMetadata responseMetadata, BigtableTracer tracer) { + if (context instanceof GrpcCallContext) { + // context should always be an instance of GrpcCallContext. Sanity check just in case. + GrpcCallContext callContext = (GrpcCallContext) context; + CallOptions callOptions = callContext.getCallOptions(); + return responseMetadata.addHandlers( + callContext.withCallOptions(callOptions.withStreamTracerFactory( + new BigtableGrpcStreamTracer.Factory(tracer)))); + } else { + return responseMetadata.addHandlers(context); + } + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java index 1b833f5c06..84f82f8809 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java @@ -45,6 +45,7 @@ import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; import com.google.common.collect.ImmutableMap; +import io.grpc.ClientInterceptor; import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; import io.grpc.Metadata; import io.grpc.Server; @@ -360,7 +361,7 @@ public void testGFEMissingHeaderMetric() throws InterruptedException { @Test public void testMetricsWithErrorResponse() throws InterruptedException { try { - stub.readRowsCallable().call(Query.create("random-table-id")).iterator().next(); + stub.readRowsCallable().call(Query.create(TABLE_ID)).iterator().next(); fail("readrows should throw exception"); } catch (Exception e) { assertThat(e).isInstanceOf(UnavailableException.class); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java index 0de14636c6..85b458264b 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java @@ -251,4 +251,11 @@ public void testMethodsOverride() { .comparingElementsUsing(MethodComparator.METHOD_CORRESPONDENCE) .containsAtLeastElementsIn(baseMethods); } + + @Test + public void testRequestBlockedOnChannel() { + compositeTracer.requestBlockedOnChannel(5L); + verify(child3, times(1)).requestBlockedOnChannel(5L); + verify(child4, times(1)).requestBlockedOnChannel(5L); + } } From c2e21fba04002b99ae06e875f78ef847412a61af Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 25 Jan 2023 15:26:15 -0500 Subject: [PATCH 2/9] add tests --- .../data/v2/stub/EnhancedBigtableStub.java | 4 +- .../metrics/BigtableGrpcStreamTracer.java | 46 ++++----- .../BigtableTracerStreamingCallable.java | 22 +++-- .../metrics/BigtableTracerUnaryCallable.java | 21 +++- .../v2/stub/metrics/BuiltinMetricsTracer.java | 1 - .../bigtable/data/v2/stub/metrics/Util.java | 6 +- .../metrics/BigtableTracerCallableTest.java | 3 +- .../metrics/BuiltinMetricsTracerTest.java | 96 ++++++++++++++++++- 8 files changed, 156 insertions(+), 43 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index e8cec34e84..bbb9511fb2 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -484,7 +484,7 @@ private UnaryCallable> createBulkReadRowsCallable( new TracedBatcherUnaryCallable<>(readRowsUserCallable.all()); UnaryCallable> withBigtableTracer = - new BigtableTracerUnaryCallable<>(tracedBatcher); + new BigtableTracerUnaryCallable<>(tracedBatcher, true); UnaryCallable> traced = new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), span); @@ -616,7 +616,7 @@ private UnaryCallable createBulkMutateRowsCallable() { new TracedBatcherUnaryCallable<>(userFacing); UnaryCallable withBigtableTracer = - new BigtableTracerUnaryCallable<>(tracedBatcherUnaryCallable); + new BigtableTracerUnaryCallable<>(tracedBatcherUnaryCallable, true); UnaryCallable traced = new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), spanName); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java index f179b4966b..5800b30ef4 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java @@ -4,40 +4,40 @@ import io.grpc.Attributes; import io.grpc.ClientStreamTracer; import io.grpc.Metadata; - import java.util.concurrent.TimeUnit; /** Records the time a request is blocked on the grpc channel */ class BigtableGrpcStreamTracer extends ClientStreamTracer { - private final Stopwatch stopwatch = Stopwatch.createUnstarted(); - private final BigtableTracer tracer; + private final Stopwatch stopwatch = Stopwatch.createUnstarted(); + private final BigtableTracer tracer; - public BigtableGrpcStreamTracer(BigtableTracer tracer) { - this.tracer = tracer; - } + public BigtableGrpcStreamTracer(BigtableTracer tracer) { + this.tracer = tracer; + } - @Override - public void streamCreated(Attributes transportAttrs, Metadata headers) { - stopwatch.start(); - } + @Override + public void streamCreated(Attributes transportAttrs, Metadata headers) { + stopwatch.start(); + } - @Override - public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { - tracer.requestBlockedOnChannel(stopwatch.elapsed(TimeUnit.MILLISECONDS)); - } + @Override + public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + tracer.requestBlockedOnChannel(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } - static class Factory extends ClientStreamTracer.Factory { + static class Factory extends ClientStreamTracer.Factory { - private final BigtableTracer tracer; + private final BigtableTracer tracer; - Factory(BigtableTracer tracer) { - this.tracer = tracer; - } + Factory(BigtableTracer tracer) { + this.tracer = tracer; + } - @Override - public ClientStreamTracer newClientStreamTracer(ClientStreamTracer.StreamInfo info, Metadata headers) { - return new BigtableGrpcStreamTracer(tracer); - } + @Override + public ClientStreamTracer newClientStreamTracer( + ClientStreamTracer.StreamInfo info, Metadata headers) { + return new BigtableGrpcStreamTracer(tracer); } + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java index f7a8474aec..510f9ed815 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java @@ -16,7 +16,6 @@ package com.google.cloud.bigtable.data.v2.stub.metrics; import com.google.api.core.InternalApi; -import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcResponseMetadata; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ResponseObserver; @@ -25,9 +24,6 @@ import com.google.cloud.bigtable.data.v2.stub.SafeResponseObserver; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import io.grpc.ClientStreamTracer; - -import java.util.List; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; @@ -49,10 +45,17 @@ public class BigtableTracerStreamingCallable extends ServerStreamingCallable { private final ServerStreamingCallable innerCallable; + private final boolean batching; public BigtableTracerStreamingCallable( @Nonnull ServerStreamingCallable callable) { + this(callable, false); + } + + public BigtableTracerStreamingCallable( + @Nonnull ServerStreamingCallable callable, boolean batching) { this.innerCallable = Preconditions.checkNotNull(callable, "Inner callable must be set"); + this.batching = batching; } @Override @@ -64,8 +67,15 @@ public void call( BigtableTracerResponseObserver innerObserver = new BigtableTracerResponseObserver<>( responseObserver, (BigtableTracer) context.getTracer(), responseMetadata); - innerCallable.call(request, innerObserver, - Util.addHandlerToCallContext(context, responseMetadata, (BigtableTracer) context.getTracer())); + if (batching) { + innerCallable.call(request, innerObserver, context); + } else { + innerCallable.call( + request, + innerObserver, + Util.addHandlerToCallContext( + context, responseMetadata, (BigtableTracer) context.getTracer())); + } } else { innerCallable.call(request, responseObserver, context); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java index 6ac347df39..5f6a0b074c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java @@ -19,7 +19,6 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.core.InternalApi; -import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcResponseMetadata; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.UnaryCallable; @@ -44,9 +43,16 @@ public class BigtableTracerUnaryCallable extends UnaryCallable { private final UnaryCallable innerCallable; + private final boolean batching; public BigtableTracerUnaryCallable(@Nonnull UnaryCallable innerCallable) { + this(innerCallable, false); + } + + public BigtableTracerUnaryCallable( + @Nonnull UnaryCallable innerCallable, boolean batching) { this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set"); + this.batching = batching; } @Override @@ -56,9 +62,16 @@ public ApiFuture futureCall(RequestT request, ApiCallContext context) { final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata(); BigtableTracerUnaryCallback callback = new BigtableTracerUnaryCallback((BigtableTracer) context.getTracer(), responseMetadata); - ApiFuture future = - innerCallable.futureCall(request, - Util.addHandlerToCallContext(context, responseMetadata, (BigtableTracer) context.getTracer())); + ApiFuture future; + if (batching) { + future = innerCallable.futureCall(request, context); + } else { + future = + innerCallable.futureCall( + request, + Util.addHandlerToCallContext( + context, responseMetadata, (BigtableTracer) context.getTracer())); + } ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor()); return future; } else { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index c1c9325e89..bbcfab4483 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; - import org.threeten.bp.Duration; /** diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java index 3aa6d456c3..37cae4e48c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java @@ -201,14 +201,14 @@ static void recordMetricsFromMetadata( } static GrpcCallContext addHandlerToCallContext( - ApiCallContext context, GrpcResponseMetadata responseMetadata, BigtableTracer tracer) { + ApiCallContext context, GrpcResponseMetadata responseMetadata, BigtableTracer tracer) { if (context instanceof GrpcCallContext) { // context should always be an instance of GrpcCallContext. Sanity check just in case. GrpcCallContext callContext = (GrpcCallContext) context; CallOptions callOptions = callContext.getCallOptions(); return responseMetadata.addHandlers( - callContext.withCallOptions(callOptions.withStreamTracerFactory( - new BigtableGrpcStreamTracer.Factory(tracer)))); + callContext.withCallOptions( + callOptions.withStreamTracerFactory(new BigtableGrpcStreamTracer.Factory(tracer)))); } else { return responseMetadata.addHandlers(context); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java index 84f82f8809..1b833f5c06 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java @@ -45,7 +45,6 @@ import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; import com.google.common.collect.ImmutableMap; -import io.grpc.ClientInterceptor; import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; import io.grpc.Metadata; import io.grpc.Server; @@ -361,7 +360,7 @@ public void testGFEMissingHeaderMetric() throws InterruptedException { @Test public void testMetricsWithErrorResponse() throws InterruptedException { try { - stub.readRowsCallable().call(Query.create(TABLE_ID)).iterator().next(); + stub.readRowsCallable().call(Query.create("random-table-id")).iterator().next(); fail("readrows should throw exception"); } catch (Exception e) { assertThat(e).isInstanceOf(UnavailableException.class); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java index a2c6d417b3..61d5398145 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java @@ -24,10 +24,12 @@ import static org.mockito.Mockito.when; import com.google.api.client.util.Lists; +import com.google.api.core.ApiFunction; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.Batcher; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.rpc.ClientContext; import com.google.api.gax.rpc.NotFoundException; import com.google.api.gax.rpc.ResponseObserver; @@ -55,8 +57,15 @@ import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import com.google.protobuf.StringValue; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; import io.grpc.ForwardingServerCall; +import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; +import io.grpc.MethodDescriptor; import io.grpc.Server; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; @@ -101,6 +110,8 @@ public class BuiltinMetricsTracerTest { private static final long SERVER_LATENCY = 100; private static final long APPLICATION_LATENCY = 200; + private static final long CHANNEL_BLOCKING_LATENCY = 75; + @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); private final FakeService fakeService = new FakeService(); @@ -148,6 +159,28 @@ public void sendHeaders(Metadata headers) { } }; + ClientInterceptor clientInterceptor = + new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions, + Channel channel) { + return new ForwardingClientCall.SimpleForwardingClientCall( + channel.newCall(methodDescriptor, callOptions)) { + @Override + public void sendMessage(ReqT message) { + try { + Thread.sleep(CHANNEL_BLOCKING_LATENCY); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + super.sendMessage(message); + } + }; + } + }; + server = FakeServiceBuilder.create(fakeService).intercept(trailersInterceptor).start(); BigtableDataSettings settings = @@ -180,6 +213,29 @@ public void sendHeaders(Metadata headers) { .build()); stubSettingsBuilder.setTracerFactory(mockFactory); + InstantiatingGrpcChannelProvider.Builder channelProvider = + ((InstantiatingGrpcChannelProvider) stubSettingsBuilder.getTransportChannelProvider()) + .toBuilder(); + + @SuppressWarnings("rawtypes") + final ApiFunction oldConfigurator = + channelProvider.getChannelConfigurator(); + + @SuppressWarnings("rawtypes") + final ApiFunction newConfigurator = + new ApiFunction() { + @Override + @SuppressWarnings("rawtypes") + public ManagedChannelBuilder apply(ManagedChannelBuilder builder) { + if (oldConfigurator != null) { + builder = oldConfigurator.apply(builder); + } + return builder.intercept(clientInterceptor); + } + }; + channelProvider.setChannelConfigurator(newConfigurator); + stubSettingsBuilder.setTransportChannelProvider(channelProvider.build()); + EnhancedBigtableStubSettings stubSettings = stubSettingsBuilder.build(); stub = new EnhancedBigtableStub(stubSettings, ClientContext.create(stubSettings)); } @@ -353,7 +409,7 @@ public void testRetryCount() { invocationOnMock -> new BuiltinMetricsTracer( OperationType.ServerStreaming, - SpanName.of("Bigtable", "ReadRows"), + SpanName.of("Bigtable", "MutateRow"), statsRecorderWrapper)); ArgumentCaptor retryCount = ArgumentCaptor.forClass(Integer.class); @@ -418,7 +474,7 @@ public void testReadRowsAttemptsTagValues() { } @Test - public void testClientBlockingLatencies() throws InterruptedException { + public void testBatchBlockingLatencies() throws InterruptedException { when(mockFactory.newTracer(any(), any(), any())) .thenReturn( new BuiltinMetricsTracer( @@ -442,6 +498,42 @@ public void testClientBlockingLatencies() throws InterruptedException { } } + @Test + public void testBlockedOnChannelServerStreamLatencies() throws InterruptedException { + when(mockFactory.newTracer(any(), any(), any())) + .thenReturn( + new BuiltinMetricsTracer( + OperationType.ServerStreaming, + SpanName.of("Bigtable", "ReadRows"), + statsRecorderWrapper)); + + stub.readRowsCallable().all().call(Query.create(TABLE_ID)); + + ArgumentCaptor blockedTime = ArgumentCaptor.forClass(Long.class); + + verify(statsRecorderWrapper, times(fakeService.attemptCounter.get())) + .putRequestBlockedOnChannel(blockedTime.capture()); + + assertThat(blockedTime.getAllValues().get(1)).isAtLeast(CHANNEL_BLOCKING_LATENCY); + } + + @Test + public void testBlockedOnChannelUnaryLatencies() throws InterruptedException { + when(mockFactory.newTracer(any(), any(), any())) + .thenReturn( + new BuiltinMetricsTracer( + OperationType.Unary, SpanName.of("Bigtable", "MutateRow"), statsRecorderWrapper)); + stub.mutateRowCallable().call(RowMutation.create(TABLE_ID, "a-key").setCell("f", "q", "v")); + + ArgumentCaptor blockedTime = ArgumentCaptor.forClass(Long.class); + + verify(statsRecorderWrapper, times(fakeService.attemptCounter.get())) + .putRequestBlockedOnChannel(blockedTime.capture()); + + assertThat(blockedTime.getAllValues().get(1)).isAtLeast(CHANNEL_BLOCKING_LATENCY); + assertThat(blockedTime.getAllValues().get(2)).isAtLeast(CHANNEL_BLOCKING_LATENCY); + } + @Test public void testPermanentFailure() { when(mockFactory.newTracer(any(), any(), any())) From 6fb5fbac02474621b147dd3be45807ad283dc110 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 25 Jan 2023 15:49:41 -0500 Subject: [PATCH 3/9] reformat, fixing tests --- .../bigtable/stats/StatsRecorderWrapper.java | 1 + .../v2/stub/metrics/BigtableGrpcStreamTracer.java | 15 +++++++++++++++ .../metrics/BigtableTracerStreamingCallable.java | 2 +- .../stub/metrics/BigtableTracerUnaryCallable.java | 2 +- 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java index 989f1c9db7..759829ae0f 100644 --- a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java +++ b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java @@ -40,6 +40,7 @@ public class StatsRecorderWrapper { private final SpanName spanName; private final Map statsAttributes; + private volatile long accumulatedLatency = 0; private MeasureMap attemptMeasureMap; private MeasureMap operationMeasureMap; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java index 5800b30ef4..c78bc4fec3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java @@ -1,3 +1,18 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.google.cloud.bigtable.data.v2.stub.metrics; import com.google.common.base.Stopwatch; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java index 510f9ed815..6abb7f0980 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java @@ -68,7 +68,7 @@ public void call( new BigtableTracerResponseObserver<>( responseObserver, (BigtableTracer) context.getTracer(), responseMetadata); if (batching) { - innerCallable.call(request, innerObserver, context); + innerCallable.call(request, innerObserver, responseMetadata.addHandlers(context)); } else { innerCallable.call( request, diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java index 5f6a0b074c..f18dcffbb6 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java @@ -64,7 +64,7 @@ public ApiFuture futureCall(RequestT request, ApiCallContext context) { new BigtableTracerUnaryCallback((BigtableTracer) context.getTracer(), responseMetadata); ApiFuture future; if (batching) { - future = innerCallable.futureCall(request, context); + future = innerCallable.futureCall(request, responseMetadata.addHandlers(context)); } else { future = innerCallable.futureCall( From cf79054c7f13cd804857297ff355c2302fb7cd42 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 17 Apr 2023 13:53:58 -0400 Subject: [PATCH 4/9] address comments --- .../bigtable/stats/StatsRecorderWrapper.java | 2 +- .../stats/StatsRecorderWrapperTest.java | 4 +- .../data/v2/stub/EnhancedBigtableStub.java | 5 +- .../metrics/BigtableGrpcStreamTracer.java | 8 ++- .../data/v2/stub/metrics/BigtableTracer.java | 2 +- .../BigtableTracerBatchedCallable.java | 54 +++++++++++++++++++ .../BigtableTracerStreamingCallable.java | 2 +- .../metrics/BigtableTracerUnaryCallable.java | 31 ++++------- .../v2/stub/metrics/BuiltinMetricsTracer.java | 4 +- .../data/v2/stub/metrics/CompositeTracer.java | 4 +- .../bigtable/data/v2/stub/metrics/Util.java | 6 ++- .../metrics/BuiltinMetricsTracerTest.java | 4 +- .../v2/stub/metrics/CompositeTracerTest.java | 6 +-- 13 files changed, 93 insertions(+), 39 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedCallable.java diff --git a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java index 759829ae0f..7ae287cff9 100644 --- a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java +++ b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java @@ -120,7 +120,7 @@ public void putBatchRequestThrottled(long throttledTimeMs) { operationMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, throttledTimeMs); } - public void putRequestBlockedOnChannel(long blockedTime) { + public void putGrpcChannelQueuedLatencies(long blockedTime) { attemptMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, blockedTime); } diff --git a/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java b/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java index edb89f39cc..80d4eaefa0 100644 --- a/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java +++ b/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java @@ -92,7 +92,7 @@ public void testStreamingOperation() throws InterruptedException { recorderWrapper.putGfeMissingHeaders(connectivityErrorCount); recorderWrapper.putFirstResponseLatencies(firstResponseLatency); recorderWrapper.putBatchRequestThrottled(throttlingLatency); - recorderWrapper.putRequestBlockedOnChannel(throttlingLatency); + recorderWrapper.putGrpcChannelQueuedLatencies(throttlingLatency); recorderWrapper.recordOperation("OK", TABLE_ID, ZONE, CLUSTER); recorderWrapper.recordAttempt("OK", TABLE_ID, ZONE, CLUSTER); @@ -292,7 +292,7 @@ public void testUnaryOperations() throws InterruptedException { recorderWrapper.putGfeMissingHeaders(connectivityErrorCount); recorderWrapper.putFirstResponseLatencies(firstResponseLatency); recorderWrapper.putBatchRequestThrottled(throttlingLatency); - recorderWrapper.putRequestBlockedOnChannel(throttlingLatency); + recorderWrapper.putGrpcChannelQueuedLatencies(throttlingLatency); recorderWrapper.recordOperation("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER); recorderWrapper.recordAttempt("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 975498c6b5..342dcc1085 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -87,6 +87,7 @@ import com.google.cloud.bigtable.data.v2.stub.changestream.GenerateInitialChangeStreamPartitionsUserCallable; import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamResumptionStrategy; import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamUserCallable; +import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerBatchedCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory; @@ -509,7 +510,7 @@ private UnaryCallable> createBulkReadRowsCallable( new TracedBatcherUnaryCallable<>(readRowsUserCallable.all()); UnaryCallable> withBigtableTracer = - new BigtableTracerUnaryCallable<>(tracedBatcher, true); + new BigtableTracerBatchedCallable<>(tracedBatcher); UnaryCallable> traced = new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), span); @@ -641,7 +642,7 @@ private UnaryCallable createBulkMutateRowsCallable() { new TracedBatcherUnaryCallable<>(userFacing); UnaryCallable withBigtableTracer = - new BigtableTracerUnaryCallable<>(tracedBatcherUnaryCallable, true); + new BigtableTracerBatchedCallable<>(tracedBatcherUnaryCallable); UnaryCallable traced = new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), spanName); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java index c78bc4fec3..1cda49934c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java @@ -21,7 +21,11 @@ import io.grpc.Metadata; import java.util.concurrent.TimeUnit; -/** Records the time a request is blocked on the grpc channel */ +/** + * Records the time a request is enqueued in a grpc channel queue. This a bridge between gRPC stream + * tracing and Bigtable tracing. Its primary purpose is to measure the transition time between + * asking gRPC to start an RPC and gRPC actually serializing that RPC. + */ class BigtableGrpcStreamTracer extends ClientStreamTracer { private final Stopwatch stopwatch = Stopwatch.createUnstarted(); @@ -38,7 +42,7 @@ public void streamCreated(Attributes transportAttrs, Metadata headers) { @Override public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { - tracer.requestBlockedOnChannel(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + tracer.grpcChannelQueuedLatencies(stopwatch.elapsed(TimeUnit.MILLISECONDS)); } static class Factory extends ClientStreamTracer.Factory { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java index 6b4c2b6251..3445514f7b 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java @@ -83,7 +83,7 @@ public void setLocations(String zone, String cluster) { // noop } - public void requestBlockedOnChannel(long blockedTimeMs) { + public void grpcChannelQueuedLatencies(long queuedTimeMs) { // noop } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedCallable.java new file mode 100644 index 0000000000..111c4d76e8 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedCallable.java @@ -0,0 +1,54 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.core.InternalApi; +import com.google.api.gax.grpc.GrpcResponseMetadata; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.common.util.concurrent.MoreExecutors; +import javax.annotation.Nonnull; + +/** + * This callable will do everything described in {@link BigtableTracerUnaryCallable} + * except that it won't inject a {@link BigtableGrpcStreamTracer}. For batching calls, + * we only want to calculate the total time client is blocked because of flow control. + */ +@InternalApi +public class BigtableTracerBatchedCallable + extends BigtableTracerUnaryCallable { + + private UnaryCallable innerCallable; + + public BigtableTracerBatchedCallable(@Nonnull UnaryCallable innerCallable) { + super(innerCallable); + this.innerCallable = innerCallable; + } + + @Override + public ApiFuture futureCall(RequestT request, ApiCallContext context) { + final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata(); + BigtableTracerUnaryCallback callback = + new BigtableTracerUnaryCallback( + (BigtableTracer) context.getTracer(), responseMetadata); + ApiFuture future = + innerCallable.futureCall(request, responseMetadata.addHandlers(context)); + ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor()); + return future; + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java index 6abb7f0980..200393dc74 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java @@ -73,7 +73,7 @@ public void call( innerCallable.call( request, innerObserver, - Util.addHandlerToCallContext( + Util.injectBigtableStreamTracer( context, responseMetadata, (BigtableTracer) context.getTracer())); } } else { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java index f18dcffbb6..e5ec7b806b 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java @@ -35,6 +35,8 @@ * the gfe_header_missing_counter in this case. *
  • -This class will also access trailers from {@link GrpcResponseMetadata} to record zone and * cluster ids. + *
  • -This class will also inject a {@link BigtableGrpcStreamTracer} that'll record the time an + * RPC spent in a grpc channel queue. *
  • This class is considered an internal implementation detail and not meant to be used by * applications. */ @@ -43,35 +45,24 @@ public class BigtableTracerUnaryCallable extends UnaryCallable { private final UnaryCallable innerCallable; - private final boolean batching; public BigtableTracerUnaryCallable(@Nonnull UnaryCallable innerCallable) { - this(innerCallable, false); - } - - public BigtableTracerUnaryCallable( - @Nonnull UnaryCallable innerCallable, boolean batching) { this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set"); - this.batching = batching; } @Override - public ApiFuture futureCall(RequestT request, ApiCallContext context) { + public ApiFuture futureCall(RequestT request, ApiCallContext context) { // tracer should always be an instance of BigtableTracer if (context.getTracer() instanceof BigtableTracer) { final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata(); - BigtableTracerUnaryCallback callback = - new BigtableTracerUnaryCallback((BigtableTracer) context.getTracer(), responseMetadata); - ApiFuture future; - if (batching) { - future = innerCallable.futureCall(request, responseMetadata.addHandlers(context)); - } else { - future = - innerCallable.futureCall( - request, - Util.addHandlerToCallContext( - context, responseMetadata, (BigtableTracer) context.getTracer())); - } + BigtableTracerUnaryCallback callback = + new BigtableTracerUnaryCallback( + (BigtableTracer) context.getTracer(), responseMetadata); + ApiFuture future = + innerCallable.futureCall( + request, + Util.injectBigtableStreamTracer( + context, responseMetadata, (BigtableTracer) context.getTracer())); ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor()); return future; } else { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index 29e13ae92e..de048b5ab8 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -223,8 +223,8 @@ public void batchRequestThrottled(long throttledTimeMs) { } @Override - public void requestBlockedOnChannel(long blockedTimeMs) { - recorder.putRequestBlockedOnChannel(blockedTimeMs); + public void grpcChannelQueuedLatencies(long queuedTimeMs) { + recorder.putGrpcChannelQueuedLatencies(queuedTimeMs); } @Override diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java index b05d3832b3..774c6d9f22 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java @@ -220,9 +220,9 @@ public void afterResponse(long applicationLatency) { } @Override - public void requestBlockedOnChannel(long blockedTimeMs) { + public void grpcChannelQueuedLatencies(long queuedTimeMs) { for (BigtableTracer tracer : bigtableTracers) { - tracer.requestBlockedOnChannel(blockedTimeMs); + tracer.grpcChannelQueuedLatencies(queuedTimeMs); } } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java index 37cae4e48c..d68f4379cd 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java @@ -200,7 +200,11 @@ static void recordMetricsFromMetadata( tracer.recordGfeMetadata(latency, throwable); } - static GrpcCallContext addHandlerToCallContext( + /** + * This method bridges gRPC stream tracing to bigtable tracing by adding a {@link + * io.grpc.ClientStreamTracer} to the callContext. + */ + static GrpcCallContext injectBigtableStreamTracer( ApiCallContext context, GrpcResponseMetadata responseMetadata, BigtableTracer tracer) { if (context instanceof GrpcCallContext) { // context should always be an instance of GrpcCallContext. Sanity check just in case. diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java index a265422948..5f06baa0ff 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java @@ -512,7 +512,7 @@ public void testBlockedOnChannelServerStreamLatencies() throws InterruptedExcept ArgumentCaptor blockedTime = ArgumentCaptor.forClass(Long.class); verify(statsRecorderWrapper, times(fakeService.attemptCounter.get())) - .putRequestBlockedOnChannel(blockedTime.capture()); + .putGrpcChannelQueuedLatencies(blockedTime.capture()); assertThat(blockedTime.getAllValues().get(1)).isAtLeast(CHANNEL_BLOCKING_LATENCY); } @@ -528,7 +528,7 @@ public void testBlockedOnChannelUnaryLatencies() throws InterruptedException { ArgumentCaptor blockedTime = ArgumentCaptor.forClass(Long.class); verify(statsRecorderWrapper, times(fakeService.attemptCounter.get())) - .putRequestBlockedOnChannel(blockedTime.capture()); + .putGrpcChannelQueuedLatencies(blockedTime.capture()); assertThat(blockedTime.getAllValues().get(1)).isAtLeast(CHANNEL_BLOCKING_LATENCY); assertThat(blockedTime.getAllValues().get(2)).isAtLeast(CHANNEL_BLOCKING_LATENCY); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java index 85b458264b..11dd0b5095 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java @@ -254,8 +254,8 @@ public void testMethodsOverride() { @Test public void testRequestBlockedOnChannel() { - compositeTracer.requestBlockedOnChannel(5L); - verify(child3, times(1)).requestBlockedOnChannel(5L); - verify(child4, times(1)).requestBlockedOnChannel(5L); + compositeTracer.grpcChannelQueuedLatencies(5L); + verify(child3, times(1)).grpcChannelQueuedLatencies(5L); + verify(child4, times(1)).grpcChannelQueuedLatencies(5L); } } From bede5ab58bc2c7fa96012f5fb51f1138aa5ea222 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 17 Apr 2023 13:56:52 -0400 Subject: [PATCH 5/9] reformat --- .../data/v2/stub/metrics/BigtableTracerBatchedCallable.java | 6 +++--- .../google/cloud/bigtable/data/v2/stub/metrics/Util.java | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedCallable.java index 111c4d76e8..dd3eb6f356 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedCallable.java @@ -25,9 +25,9 @@ import javax.annotation.Nonnull; /** - * This callable will do everything described in {@link BigtableTracerUnaryCallable} - * except that it won't inject a {@link BigtableGrpcStreamTracer}. For batching calls, - * we only want to calculate the total time client is blocked because of flow control. + * This callable will do everything described in {@link BigtableTracerUnaryCallable} except that it + * won't inject a {@link BigtableGrpcStreamTracer}. For batching calls, we only want to calculate + * the total time client is blocked because of flow control. */ @InternalApi public class BigtableTracerBatchedCallable diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java index d68f4379cd..8baf6a15f4 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java @@ -207,14 +207,15 @@ static void recordMetricsFromMetadata( static GrpcCallContext injectBigtableStreamTracer( ApiCallContext context, GrpcResponseMetadata responseMetadata, BigtableTracer tracer) { if (context instanceof GrpcCallContext) { - // context should always be an instance of GrpcCallContext. Sanity check just in case. GrpcCallContext callContext = (GrpcCallContext) context; CallOptions callOptions = callContext.getCallOptions(); return responseMetadata.addHandlers( callContext.withCallOptions( callOptions.withStreamTracerFactory(new BigtableGrpcStreamTracer.Factory(tracer)))); } else { - return responseMetadata.addHandlers(context); + // context should always be an instance of GrpcCallContext. If not throw an exception + // so we can see what class context is. + throw new RuntimeException("Unexpected context class: " + context.getClass().getName()); } } } From 56aa26354fba5f56f06351b9a9dc5efd5e297bfe Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 17 Apr 2023 14:08:16 -0400 Subject: [PATCH 6/9] remove unused variable --- .../com/google/cloud/bigtable/stats/StatsRecorderWrapper.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java index 7ae287cff9..aafb75c6fb 100644 --- a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java +++ b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java @@ -40,7 +40,6 @@ public class StatsRecorderWrapper { private final SpanName spanName; private final Map statsAttributes; - private volatile long accumulatedLatency = 0; private MeasureMap attemptMeasureMap; private MeasureMap operationMeasureMap; From b452e3e9e30edf1196115b919f67ebf9c15fbd11 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 17 Apr 2023 14:55:29 -0400 Subject: [PATCH 7/9] update --- .../data/v2/stub/EnhancedBigtableStub.java | 6 ++--- ...> BigtableTracerBatchedUnaryCallable.java} | 5 ++-- .../BigtableTracerStreamingCallable.java | 23 ++++++------------- 3 files changed, 13 insertions(+), 21 deletions(-) rename google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/{BigtableTracerBatchedCallable.java => BigtableTracerBatchedUnaryCallable.java} (91%) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 342dcc1085..820dc7c652 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -87,7 +87,7 @@ import com.google.cloud.bigtable.data.v2.stub.changestream.GenerateInitialChangeStreamPartitionsUserCallable; import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamResumptionStrategy; import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamUserCallable; -import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerBatchedCallable; +import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerBatchedUnaryCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory; @@ -510,7 +510,7 @@ private UnaryCallable> createBulkReadRowsCallable( new TracedBatcherUnaryCallable<>(readRowsUserCallable.all()); UnaryCallable> withBigtableTracer = - new BigtableTracerBatchedCallable<>(tracedBatcher); + new BigtableTracerBatchedUnaryCallable<>(tracedBatcher); UnaryCallable> traced = new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), span); @@ -642,7 +642,7 @@ private UnaryCallable createBulkMutateRowsCallable() { new TracedBatcherUnaryCallable<>(userFacing); UnaryCallable withBigtableTracer = - new BigtableTracerBatchedCallable<>(tracedBatcherUnaryCallable); + new BigtableTracerBatchedUnaryCallable<>(tracedBatcherUnaryCallable); UnaryCallable traced = new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), spanName); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedUnaryCallable.java similarity index 91% rename from google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedCallable.java rename to google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedUnaryCallable.java index dd3eb6f356..06722aaea5 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedUnaryCallable.java @@ -30,12 +30,13 @@ * the total time client is blocked because of flow control. */ @InternalApi -public class BigtableTracerBatchedCallable +public class BigtableTracerBatchedUnaryCallable extends BigtableTracerUnaryCallable { private UnaryCallable innerCallable; - public BigtableTracerBatchedCallable(@Nonnull UnaryCallable innerCallable) { + public BigtableTracerBatchedUnaryCallable( + @Nonnull UnaryCallable innerCallable) { super(innerCallable); this.innerCallable = innerCallable; } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java index 200393dc74..167cd0dc2e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java @@ -37,6 +37,8 @@ *
  • -This class will also access trailers from {@link GrpcResponseMetadata} to record zone and * cluster ids. *
  • -Call {@link BigtableTracer#onRequest(int)} to record the request events in a stream. + *
  • -This class will also inject a {@link BigtableGrpcStreamTracer} that'll record the time an + * RPC spent in a grpc channel queue. *
  • This class is considered an internal implementation detail and not meant to be used by * applications. */ @@ -45,17 +47,10 @@ public class BigtableTracerStreamingCallable extends ServerStreamingCallable { private final ServerStreamingCallable innerCallable; - private final boolean batching; public BigtableTracerStreamingCallable( @Nonnull ServerStreamingCallable callable) { - this(callable, false); - } - - public BigtableTracerStreamingCallable( - @Nonnull ServerStreamingCallable callable, boolean batching) { this.innerCallable = Preconditions.checkNotNull(callable, "Inner callable must be set"); - this.batching = batching; } @Override @@ -67,15 +62,11 @@ public void call( BigtableTracerResponseObserver innerObserver = new BigtableTracerResponseObserver<>( responseObserver, (BigtableTracer) context.getTracer(), responseMetadata); - if (batching) { - innerCallable.call(request, innerObserver, responseMetadata.addHandlers(context)); - } else { - innerCallable.call( - request, - innerObserver, - Util.injectBigtableStreamTracer( - context, responseMetadata, (BigtableTracer) context.getTracer())); - } + innerCallable.call( + request, + innerObserver, + Util.injectBigtableStreamTracer( + context, responseMetadata, (BigtableTracer) context.getTracer())); } else { innerCallable.call(request, responseObserver, context); } From 58c53fe51ddc3795193c02067242de9707b7a9be Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 18 Apr 2023 10:31:05 -0400 Subject: [PATCH 8/9] update test --- .../stub/metrics/BuiltinMetricsTracerTest.java | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java index 5f06baa0ff..bd48b463bd 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java @@ -221,19 +221,13 @@ public void sendMessage(ReqT message) { final ApiFunction oldConfigurator = channelProvider.getChannelConfigurator(); - @SuppressWarnings("rawtypes") - final ApiFunction newConfigurator = - new ApiFunction() { - @Override - @SuppressWarnings("rawtypes") - public ManagedChannelBuilder apply(ManagedChannelBuilder builder) { - if (oldConfigurator != null) { - builder = oldConfigurator.apply(builder); - } - return builder.intercept(clientInterceptor); + channelProvider.setChannelConfigurator( + (builder) -> { + if (oldConfigurator != null) { + builder = oldConfigurator.apply(builder); } - }; - channelProvider.setChannelConfigurator(newConfigurator); + return builder.intercept(clientInterceptor); + }); stubSettingsBuilder.setTransportChannelProvider(channelProvider.build()); EnhancedBigtableStubSettings stubSettings = stubSettingsBuilder.build(); From 9313db4d4b8f683dbf1c2dbcc4b99c369a9b655d Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 18 Apr 2023 14:51:15 -0400 Subject: [PATCH 9/9] aggregate grpc queued and batch flow controlled latencies --- .../clirr-ignored-differences.xml | 6 ++++++ .../bigtable/stats/StatsRecorderWrapper.java | 8 ++------ .../bigtable/stats/StatsRecorderWrapperTest.java | 6 ++---- .../v2/stub/metrics/BuiltinMetricsTracer.java | 8 ++++++-- .../stub/metrics/BuiltinMetricsTracerTest.java | 16 ++++++++-------- 5 files changed, 24 insertions(+), 20 deletions(-) diff --git a/google-cloud-bigtable-stats/clirr-ignored-differences.xml b/google-cloud-bigtable-stats/clirr-ignored-differences.xml index ff42f58da4..a920751495 100644 --- a/google-cloud-bigtable-stats/clirr-ignored-differences.xml +++ b/google-cloud-bigtable-stats/clirr-ignored-differences.xml @@ -13,4 +13,10 @@ com/google/cloud/bigtable/stats/StatsRecorderWrapper void record(java.lang.String, java.lang.String, java.lang.String, java.lang.String) + + + 7002 + com/google/cloud/bigtable/stats/StatsRecorderWrapper + void putBatchRequestThrottled(long) + diff --git a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java index aafb75c6fb..88eab077c3 100644 --- a/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java +++ b/google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsRecorderWrapper.java @@ -115,12 +115,8 @@ public void putGfeMissingHeaders(long connectivityErrors) { attemptMeasureMap.put(BuiltinMeasureConstants.CONNECTIVITY_ERROR_COUNT, connectivityErrors); } - public void putBatchRequestThrottled(long throttledTimeMs) { - operationMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, throttledTimeMs); - } - - public void putGrpcChannelQueuedLatencies(long blockedTime) { - attemptMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, blockedTime); + public void putClientBlockingLatencies(long clientBlockingLatency) { + operationMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, clientBlockingLatency); } private TagContextBuilder newTagContextBuilder(String tableId, String zone, String cluster) { diff --git a/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java b/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java index 80d4eaefa0..b68e4f1a1b 100644 --- a/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java +++ b/google-cloud-bigtable-stats/src/test/java/com/google/cloud/bigtable/stats/StatsRecorderWrapperTest.java @@ -91,8 +91,7 @@ public void testStreamingOperation() throws InterruptedException { recorderWrapper.putGfeLatencies(serverLatency); recorderWrapper.putGfeMissingHeaders(connectivityErrorCount); recorderWrapper.putFirstResponseLatencies(firstResponseLatency); - recorderWrapper.putBatchRequestThrottled(throttlingLatency); - recorderWrapper.putGrpcChannelQueuedLatencies(throttlingLatency); + recorderWrapper.putClientBlockingLatencies(throttlingLatency); recorderWrapper.recordOperation("OK", TABLE_ID, ZONE, CLUSTER); recorderWrapper.recordAttempt("OK", TABLE_ID, ZONE, CLUSTER); @@ -291,8 +290,7 @@ public void testUnaryOperations() throws InterruptedException { recorderWrapper.putGfeLatencies(serverLatency); recorderWrapper.putGfeMissingHeaders(connectivityErrorCount); recorderWrapper.putFirstResponseLatencies(firstResponseLatency); - recorderWrapper.putBatchRequestThrottled(throttlingLatency); - recorderWrapper.putGrpcChannelQueuedLatencies(throttlingLatency); + recorderWrapper.putClientBlockingLatencies(throttlingLatency); recorderWrapper.recordOperation("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER); recorderWrapper.recordAttempt("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index de048b5ab8..a8b8148d3e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -71,6 +71,8 @@ class BuiltinMetricsTracer extends BigtableTracer { private String zone = "global"; private String cluster = "unspecified"; + private AtomicLong totalClientBlockingTime = new AtomicLong(0); + @VisibleForTesting BuiltinMetricsTracer( OperationType operationType, SpanName spanName, StatsRecorderWrapper recorder) { @@ -219,12 +221,12 @@ public void setLocations(String zone, String cluster) { @Override public void batchRequestThrottled(long throttledTimeMs) { - recorder.putBatchRequestThrottled(throttledTimeMs); + totalClientBlockingTime.addAndGet(throttledTimeMs); } @Override public void grpcChannelQueuedLatencies(long queuedTimeMs) { - recorder.putGrpcChannelQueuedLatencies(queuedTimeMs); + totalClientBlockingTime.addAndGet(queuedTimeMs); } @Override @@ -271,6 +273,8 @@ private void recordAttemptCompletion(@Nullable Throwable status) { } } + recorder.putClientBlockingLatencies(totalClientBlockingTime.get()); + // Patch the status until it's fixed in gax. When an attempt failed, // it'll throw a ServerStreamingAttemptException. Unwrap the exception // so it could get processed by extractStatus diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java index bd48b463bd..3bc283a7f7 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java @@ -480,8 +480,8 @@ public void testBatchBlockingLatencies() throws InterruptedException { int expectedNumRequests = 6 / batchElementCount; ArgumentCaptor throttledTime = ArgumentCaptor.forClass(Long.class); - verify(statsRecorderWrapper, times(expectedNumRequests)) - .putBatchRequestThrottled(throttledTime.capture()); + verify(statsRecorderWrapper, timeout(1000).times(expectedNumRequests)) + .putClientBlockingLatencies(throttledTime.capture()); // Adding the first 2 elements should not get throttled since the batch is empty assertThat(throttledTime.getAllValues().get(0)).isEqualTo(0); @@ -493,7 +493,7 @@ public void testBatchBlockingLatencies() throws InterruptedException { } @Test - public void testBlockedOnChannelServerStreamLatencies() throws InterruptedException { + public void testQueuedOnChannelServerStreamLatencies() throws InterruptedException { when(mockFactory.newTracer(any(), any(), any())) .thenReturn( new BuiltinMetricsTracer( @@ -505,14 +505,14 @@ public void testBlockedOnChannelServerStreamLatencies() throws InterruptedExcept ArgumentCaptor blockedTime = ArgumentCaptor.forClass(Long.class); - verify(statsRecorderWrapper, times(fakeService.attemptCounter.get())) - .putGrpcChannelQueuedLatencies(blockedTime.capture()); + verify(statsRecorderWrapper, timeout(1000).times(fakeService.attemptCounter.get())) + .putClientBlockingLatencies(blockedTime.capture()); assertThat(blockedTime.getAllValues().get(1)).isAtLeast(CHANNEL_BLOCKING_LATENCY); } @Test - public void testBlockedOnChannelUnaryLatencies() throws InterruptedException { + public void testQueuedOnChannelUnaryLatencies() throws InterruptedException { when(mockFactory.newTracer(any(), any(), any())) .thenReturn( new BuiltinMetricsTracer( @@ -521,8 +521,8 @@ public void testBlockedOnChannelUnaryLatencies() throws InterruptedException { ArgumentCaptor blockedTime = ArgumentCaptor.forClass(Long.class); - verify(statsRecorderWrapper, times(fakeService.attemptCounter.get())) - .putGrpcChannelQueuedLatencies(blockedTime.capture()); + verify(statsRecorderWrapper, timeout(1000).times(fakeService.attemptCounter.get())) + .putClientBlockingLatencies(blockedTime.capture()); assertThat(blockedTime.getAllValues().get(1)).isAtLeast(CHANNEL_BLOCKING_LATENCY); assertThat(blockedTime.getAllValues().get(2)).isAtLeast(CHANNEL_BLOCKING_LATENCY);