From 25d61e5685ccbad13dcd0b2e5fad8e9c4406c975 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 10 May 2022 11:49:36 -0400 Subject: [PATCH] feat: update tracers to use built in metrics --- .../clirr-ignored-differences.xml | 10 + google-cloud-bigtable/pom.xml | 9 + .../data/v2/stub/EnhancedBigtableStub.java | 72 ++-- .../data/v2/stub/metrics/BigtableTracer.java | 12 +- ...a => BigtableTracerStreamingCallable.java} | 80 +++- ....java => BigtableTracerUnaryCallable.java} | 49 ++- .../v2/stub/metrics/BuiltinMetricsTracer.java | 244 +++++++++++ .../metrics/BuiltinMetricsTracerFactory.java | 61 +++ .../data/v2/stub/metrics/CompositeTracer.java | 21 +- .../data/v2/stub/metrics/MetricsTracer.java | 21 +- .../bigtable/data/v2/stub/metrics/Util.java | 44 +- ...t.java => BigtableTracerCallableTest.java} | 21 +- .../metrics/BuiltinMetricsTracerTest.java | 389 ++++++++++++++++++ .../v2/stub/metrics/CompositeTracerTest.java | 12 +- .../data/v2/stub/metrics/UtilTest.java | 10 +- pom.xml | 9 + 16 files changed, 964 insertions(+), 100 deletions(-) rename google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/{HeaderTracerStreamingCallable.java => BigtableTracerStreamingCallable.java} (53%) rename google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/{HeaderTracerUnaryCallable.java => BigtableTracerUnaryCallable.java} (55%) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java rename google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/{HeaderTracerCallableTest.java => BigtableTracerCallableTest.java} (95%) create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml index 767e12e548..588327d0de 100644 --- a/google-cloud-bigtable/clirr-ignored-differences.xml +++ b/google-cloud-bigtable/clirr-ignored-differences.xml @@ -61,4 +61,14 @@ com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable * + + + 8001 + com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable + + + + 8001 + com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable + diff --git a/google-cloud-bigtable/pom.xml b/google-cloud-bigtable/pom.xml index 7676d0612f..9ac8711c90 100644 --- a/google-cloud-bigtable/pom.xml +++ b/google-cloud-bigtable/pom.xml @@ -54,6 +54,11 @@ pom import + + com.google.cloud + google-cloud-bigtable-stats + 2.6.3-SNAPSHOT + @@ -61,6 +66,10 @@ + + com.google.cloud + google-cloud-bigtable-stats + com.google.api diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index d8daaa80e6..393b106d5c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -70,9 +70,10 @@ import com.google.cloud.bigtable.data.v2.models.RowAdapter; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable; +import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable; +import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory; import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory; -import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable; -import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory; import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants; import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable; @@ -89,6 +90,7 @@ import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable; import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm; +import com.google.cloud.bigtable.stats.StatsWrapper; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -151,6 +153,16 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) public static EnhancedBigtableStubSettings finalizeSettings( EnhancedBigtableStubSettings settings, Tagger tagger, StatsRecorder stats) throws IOException { + StatsWrapper builtinMetricsWrapper = new StatsWrapper(false); + return finalizeSettings(settings, tagger, stats, builtinMetricsWrapper); + } + + private static EnhancedBigtableStubSettings finalizeSettings( + EnhancedBigtableStubSettings settings, + Tagger tagger, + StatsRecorder stats, + StatsWrapper builtinMetricsWrapper) + throws IOException { EnhancedBigtableStubSettings.Builder builder = settings.toBuilder(); // TODO: this implementation is on the cusp of unwieldy, if we end up adding more features @@ -194,6 +206,12 @@ public static EnhancedBigtableStubSettings finalizeSettings( RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, TagValue.create(settings.getAppProfileId())) .build(); + ImmutableMap builtinAttributes = + ImmutableMap.builder() + .put("project_id", settings.getProjectId()) + .put("instance_id", settings.getInstanceId()) + .put("app_profile", settings.getAppProfileId()) + .build(); // Inject Opencensus instrumentation builder.setTracerFactory( new CompositeTracerFactory( @@ -218,6 +236,7 @@ public static EnhancedBigtableStubSettings finalizeSettings( .build()), // Add OpenCensus Metrics MetricsTracerFactory.create(tagger, stats, attributes), + BuiltinMetricsTracerFactory.create(builtinMetricsWrapper, builtinAttributes), // Add user configured tracer settings.getTracerFactory()))); return builder.build(); @@ -369,7 +388,7 @@ public UnaryCallable createReadRowCallable(RowAdapter *
  • Upon receiving the response stream, it will merge the {@link * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row * implementation can be configured by the {@code rowAdapter} parameter. - *
  • Add header tracer for tracking GFE metrics. + *
  • Add BigtableTracer callable for tracking Bigtable specific metrics *
  • Retry/resume on failure. *
  • Filter out marker rows. * @@ -420,13 +439,13 @@ public Map extract(ReadRowsRequest readRowsRequest) { ServerStreamingCallable watched = Callables.watched(merging, innerSettings, clientContext); - ServerStreamingCallable withHeaderTracer = - new HeaderTracerStreamingCallable<>(watched); + ServerStreamingCallable withBigtableTracer = + new BigtableTracerStreamingCallable<>(watched); // Retry logic is split into 2 parts to workaround a rare edge case described in // ReadRowsRetryCompletedCallable ServerStreamingCallable retrying1 = - new ReadRowsRetryCompletedCallable<>(withHeaderTracer); + new ReadRowsRetryCompletedCallable<>(withBigtableTracer); ServerStreamingCallable retrying2 = Callables.retrying(retrying1, innerSettings, clientContext); @@ -465,11 +484,11 @@ private UnaryCallable> createBulkReadRowsCallable( UnaryCallable> tracedBatcher = new TracedBatcherUnaryCallable<>(readRowsUserCallable.all()); - UnaryCallable> withHeaderTracer = - new HeaderTracerUnaryCallable(tracedBatcher); + UnaryCallable> withBigtableTracer = + new BigtableTracerUnaryCallable<>(tracedBatcher); UnaryCallable> traced = - new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), span); + new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), span); return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } @@ -511,11 +530,11 @@ public Map extract( UnaryCallable> withStatsHeaders = new StatsHeadersUnaryCallable<>(spoolable); - UnaryCallable> withHeaderTracer = - new HeaderTracerUnaryCallable<>(withStatsHeaders); + UnaryCallable> withBigtableTracer = + new BigtableTracerUnaryCallable<>(withStatsHeaders); UnaryCallable> retryable = - Callables.retrying(withHeaderTracer, settings.sampleRowKeysSettings(), clientContext); + Callables.retrying(withBigtableTracer, settings.sampleRowKeysSettings(), clientContext); return createUserFacingUnaryCallable( methodName, new SampleRowKeysCallable(retryable, requestContext)); @@ -550,11 +569,11 @@ public Map extract(MutateRowRequest mutateRowRequest) { UnaryCallable withStatsHeaders = new StatsHeadersUnaryCallable<>(base); - UnaryCallable withHeaderTracer = - new HeaderTracerUnaryCallable<>(withStatsHeaders); + UnaryCallable withBigtableTracer = + new BigtableTracerUnaryCallable<>(withStatsHeaders); UnaryCallable retrying = - Callables.retrying(withHeaderTracer, settings.mutateRowSettings(), clientContext); + Callables.retrying(withBigtableTracer, settings.mutateRowSettings(), clientContext); return createUserFacingUnaryCallable( methodName, new MutateRowCallable(retrying, requestContext)); @@ -594,13 +613,13 @@ private UnaryCallable createBulkMutateRowsCallable() { SpanName spanName = getSpanName("MutateRows"); - UnaryCallable tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing); - - UnaryCallable withHeaderTracer = - new HeaderTracerUnaryCallable<>(tracedBatcher); + UnaryCallable tracedBatcherUnaryCallable = + new TracedBatcherUnaryCallable<>(userFacing); + UnaryCallable withBigtableTracer = + new BigtableTracerUnaryCallable<>(tracedBatcherUnaryCallable); UnaryCallable traced = - new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), spanName); + new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), spanName); return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } @@ -738,11 +757,11 @@ public Map extract( UnaryCallable withStatsHeaders = new StatsHeadersUnaryCallable<>(base); - UnaryCallable withHeaderTracer = - new HeaderTracerUnaryCallable<>(withStatsHeaders); + UnaryCallable withBigtableTracer = + new BigtableTracerUnaryCallable<>(withStatsHeaders); UnaryCallable retrying = - Callables.retrying(withHeaderTracer, settings.checkAndMutateRowSettings(), clientContext); + Callables.retrying(withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext); return createUserFacingUnaryCallable( methodName, new CheckAndMutateRowCallable(retrying, requestContext)); @@ -779,11 +798,12 @@ public Map extract(ReadModifyWriteRowRequest request) { new StatsHeadersUnaryCallable<>(base); String methodName = "ReadModifyWriteRow"; - UnaryCallable withHeaderTracer = - new HeaderTracerUnaryCallable<>(withStatsHeaders); + UnaryCallable withBigtableTracer = + new BigtableTracerUnaryCallable<>(withStatsHeaders); UnaryCallable retrying = - Callables.retrying(withHeaderTracer, settings.readModifyWriteRowSettings(), clientContext); + Callables.retrying( + withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext); return createUserFacingUnaryCallable( methodName, new ReadModifyWriteRowCallable(retrying, requestContext)); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java index 3d7707cc4c..421c5b4724 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java @@ -25,7 +25,7 @@ * A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base * implementation that does nothing. */ -@BetaApi("This surface is stable yet it might be removed in the future.") +@BetaApi("This surface is not stable yet it might be removed in the future.") public class BigtableTracer extends BaseApiTracer { private volatile int attempt = 0; @@ -35,6 +35,11 @@ public void attemptStarted(int attemptNumber) { this.attempt = attemptNumber; } + /** annotate when onRequest is called */ + public void onRequest() { + // noop + } + /** * Get the attempt number of the current call. Attempt number for the current call is passed in * and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from @@ -57,4 +62,9 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa public void batchRequestThrottled(long throttledTimeMs) { // noop } + + /** Set the Bigtable zone and cluster so metrics can be tagged with location information. */ + public void setLocations(String zone, String cluster) { + // noop + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java similarity index 53% rename from google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java rename to google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java index 31c5cf1960..80731a2fa0 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java @@ -26,26 +26,28 @@ import javax.annotation.Nonnull; /** - * This callable will inject a {@link GrpcResponseMetadata} to access the headers and trailers - * returned by gRPC methods upon completion. The {@link BigtableTracer} will process metrics that - * were injected in the header/trailer and publish them to OpenCensus. If {@link - * GrpcResponseMetadata#getMetadata()} returned null, it probably means that the request has never - * reached GFE, and it'll increment the gfe_header_missing_counter in this case. + * This callable will: * - *

    If GFE metrics are not registered in {@link RpcViews}, skip injecting GrpcResponseMetadata. - * This is for the case where direct path is enabled, all the requests won't go through GFE and - * therefore won't have the server-timing header. + *

    - Inject a {@link GrpcResponseMetadata} to access the headers and trailers returned by gRPC + * methods upon completion. The {@link BigtableTracer} will process metrics that were injected in + * the header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()} + * returned null, it probably means that the request has never reached GFE, and it'll increment the + * gfe_header_missing_counter in this case. + * + *

    - Call {@link BigtableTracer#onRequest()} to record the request events in a stream. + * + *

    - Get Bigtable zone and cluster information from response trailer and record in tracer. * *

    This class is considered an internal implementation detail and not meant to be used by * applications. */ @InternalApi -public class HeaderTracerStreamingCallable +public class BigtableTracerStreamingCallable extends ServerStreamingCallable { private final ServerStreamingCallable innerCallable; - public HeaderTracerStreamingCallable( + public BigtableTracerStreamingCallable( @Nonnull ServerStreamingCallable callable) { this.innerCallable = Preconditions.checkNotNull(callable, "Inner callable must be set"); } @@ -55,9 +57,9 @@ public void call( RequestT request, ResponseObserver responseObserver, ApiCallContext context) { final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata(); // tracer should always be an instance of bigtable tracer - if (RpcViews.isGfeMetricsRegistered() && context.getTracer() instanceof BigtableTracer) { - HeaderTracerResponseObserver innerObserver = - new HeaderTracerResponseObserver<>( + if (context.getTracer() instanceof BigtableTracer) { + BigtableTracerResponseObserver innerObserver = + new BigtableTracerResponseObserver<>( responseObserver, (BigtableTracer) context.getTracer(), responseMetadata); innerCallable.call(request, innerObserver, responseMetadata.addHandlers(context)); } else { @@ -65,13 +67,13 @@ public void call( } } - private class HeaderTracerResponseObserver implements ResponseObserver { + private class BigtableTracerResponseObserver implements ResponseObserver { private final BigtableTracer tracer; private final ResponseObserver outerObserver; private final GrpcResponseMetadata responseMetadata; - HeaderTracerResponseObserver( + BigtableTracerResponseObserver( ResponseObserver observer, BigtableTracer tracer, GrpcResponseMetadata metadata) { @@ -82,7 +84,8 @@ private class HeaderTracerResponseObserver implements ResponseObserve @Override public void onStart(final StreamController controller) { - outerObserver.onStart(controller); + final StreamController tracedController = new TracedStreamController(controller, tracer); + outerObserver.onStart(tracedController); } @Override @@ -97,6 +100,16 @@ public void onError(Throwable t) { Metadata metadata = responseMetadata.getMetadata(); Long latency = Util.getGfeLatency(metadata); tracer.recordGfeMetadata(latency, t); + // try { + // byte[] trailers = + // responseMetadata + // .getTrailingMetadata() + // .get(Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER)); + // ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers); + // tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId()); + // } catch (NullPointerException | InvalidProtocolBufferException e) { + // } + outerObserver.onError(t); } @@ -105,7 +118,42 @@ public void onComplete() { Metadata metadata = responseMetadata.getMetadata(); Long latency = Util.getGfeLatency(metadata); tracer.recordGfeMetadata(latency, null); + // try { + // byte[] trailers = + // responseMetadata.getTrailingMetadata().get(Metadata.Key.of(Util.TRAILER_KEY, + // Metadata.BINARY_BYTE_MARSHALLER)); + // ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers); + // tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId()); + // } catch (NullPointerException | InvalidProtocolBufferException e) { + // } + outerObserver.onComplete(); } } + + private class TracedStreamController implements StreamController { + private final StreamController innerController; + private final BigtableTracer tracer; + + TracedStreamController(StreamController innerController, BigtableTracer tracer) { + this.innerController = innerController; + this.tracer = tracer; + } + + @Override + public void cancel() { + innerController.cancel(); + } + + @Override + public void disableAutoInboundFlowControl() { + innerController.disableAutoInboundFlowControl(); + } + + @Override + public void request(int i) { + tracer.onRequest(); + innerController.request(i); + } + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java similarity index 55% rename from google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable.java rename to google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java index 6335b433ef..a434a97b3c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java @@ -28,37 +28,37 @@ import javax.annotation.Nonnull; /** - * This callable will inject a {@link GrpcResponseMetadata} to access the headers and trailers - * returned by gRPC methods upon completion. The {@link BigtableTracer} will process metrics that - * were injected in the header/trailer and publish them to OpenCensus. If {@link - * GrpcResponseMetadata#getMetadata()} returned null, it probably means that the request has never - * reached GFE, and it'll increment the gfe_header_missing_counter in this case. + * This callable will: * - *

    If GFE metrics are not registered in {@link RpcViews}, skip injecting GrpcResponseMetadata. - * This is for the case where direct path is enabled, all the requests won't go through GFE and - * therefore won't have the server-timing header. + *

    - Inject a {@link GrpcResponseMetadata} to access the headers and trailers returned by gRPC + * methods upon completion. The {@link BigtableTracer} will process metrics that were injected in + * the header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()} + * returned null, it probably means that the request has never reached GFE, and it'll increment the + * gfe_header_missing_counter in this case. + * + *

    - Get Bigtable zone and cluster information from response trailer and record in tracer. * *

    This class is considered an internal implementation detail and not meant to be used by * applications. */ @InternalApi -public class HeaderTracerUnaryCallable +public class BigtableTracerUnaryCallable extends UnaryCallable { private final UnaryCallable innerCallable; - public HeaderTracerUnaryCallable(@Nonnull UnaryCallable innerCallable) { + public BigtableTracerUnaryCallable(@Nonnull UnaryCallable innerCallable) { this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set"); } @Override public ApiFuture futureCall(RequestT request, ApiCallContext context) { // tracer should always be an instance of BigtableTracer - if (RpcViews.isGfeMetricsRegistered() && context.getTracer() instanceof BigtableTracer) { + if (context.getTracer() instanceof BigtableTracer) { final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata(); final ApiCallContext contextWithResponseMetadata = responseMetadata.addHandlers(context); - HeaderTracerUnaryCallback callback = - new HeaderTracerUnaryCallback((BigtableTracer) context.getTracer(), responseMetadata); + BigtableTracerUnaryCallback callback = + new BigtableTracerUnaryCallback((BigtableTracer) context.getTracer(), responseMetadata); ApiFuture future = innerCallable.futureCall(request, contextWithResponseMetadata); ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor()); return future; @@ -67,12 +67,12 @@ public ApiFuture futureCall(RequestT request, ApiCallContext context) { } } - class HeaderTracerUnaryCallback implements ApiFutureCallback { + class BigtableTracerUnaryCallback implements ApiFutureCallback { private final BigtableTracer tracer; private final GrpcResponseMetadata responseMetadata; - HeaderTracerUnaryCallback(BigtableTracer tracer, GrpcResponseMetadata responseMetadata) { + BigtableTracerUnaryCallback(BigtableTracer tracer, GrpcResponseMetadata responseMetadata) { this.tracer = tracer; this.responseMetadata = responseMetadata; } @@ -82,6 +82,16 @@ public void onFailure(Throwable throwable) { Metadata metadata = responseMetadata.getMetadata(); Long latency = Util.getGfeLatency(metadata); tracer.recordGfeMetadata(latency, throwable); + // try { + // byte[] trailers = + // responseMetadata + // .getTrailingMetadata() + // .get(Metadata.Key.of(Util.TRAILER_KEY, + // Metadata.BINARY_BYTE_MARSHALLER)); + // ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers); + // tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId()); + // } catch (NullPointerException | InvalidProtocolBufferException e) { + // } } @Override @@ -89,6 +99,15 @@ public void onSuccess(ResponseT response) { Metadata metadata = responseMetadata.getMetadata(); Long latency = Util.getGfeLatency(metadata); tracer.recordGfeMetadata(latency, null); + // try { + // byte[] trailers = + // responseMetadata + // .getTrailingMetadata() + // .get(Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER)); + // ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers); + // tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId()); + // } catch (NullPointerException | InvalidProtocolBufferException e) { + // } } } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java new file mode 100644 index 0000000000..d87d0ffc6a --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -0,0 +1,244 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.gax.tracing.ApiTracerFactory; +import com.google.api.gax.tracing.SpanName; +import com.google.cloud.bigtable.stats.BuiltinMetricsRecorder; +import com.google.cloud.bigtable.stats.StatsWrapper; +import com.google.common.base.Stopwatch; +import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; +import org.threeten.bp.Duration; + +public class BuiltinMetricsTracer extends BigtableTracer { + + private BuiltinMetricsRecorder recorder; + + private final ApiTracerFactory.OperationType operationType; + private final SpanName spanName; + + // Operation level metrics + private final AtomicBoolean opFinished = new AtomicBoolean(); + private final Stopwatch operationTimer = Stopwatch.createStarted(); + private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted(); + + // Attempt level metrics + private int attemptCount = 0; + private Stopwatch attemptTimer; + private volatile int attempt = 0; + + // Total application latency + private final Stopwatch applicationLatencyTimer = Stopwatch.createUnstarted(); + private final AtomicLong totalApplicationLatency = new AtomicLong(0); + + // Monitored resource labels + private String tableId = "undefined"; + private String zone = "undefined"; + private String cluster = "undefined"; + + // gfe stats + private AtomicLong gfeMissingHeaders = new AtomicLong(0); + + BuiltinMetricsTracer( + ApiTracerFactory.OperationType operationType, + SpanName spanName, + Map attributes, + StatsWrapper builtinMetricsWrapper, + BuiltinMetricsRecorder recorder) { + this.operationType = operationType; + this.spanName = spanName; + if (recorder != null) { + this.recorder = recorder; + } else { + this.recorder = + new BuiltinMetricsRecorder(operationType, spanName, attributes, builtinMetricsWrapper); + } + } + + @Override + public Scope inScope() { + return new Scope() { + @Override + public void close() {} + }; + } + + @Override + public void operationSucceeded() { + recordOperationCompletion(null); + } + + @Override + public void operationCancelled() { + recordOperationCompletion(new CancellationException()); + } + + @Override + public void operationFailed(Throwable error) { + recordOperationCompletion(error); + } + + @Override + public void attemptStarted(int attemptNumber) { + attemptStarted(null, attemptNumber); + } + + @Override + public void attemptStarted(Object request, int attemptNumber) { + this.attempt = attemptNumber; + attemptCount++; + attemptTimer = Stopwatch.createStarted(); + if (request != null) { + this.tableId = Util.extractTableId(request); + } + if (applicationLatencyTimer.isRunning()) { + totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS)); + applicationLatencyTimer.reset(); + } + } + + @Override + public void attemptSucceeded() { + recordAttemptCompletion(null); + } + + @Override + public void attemptCancelled() { + recordAttemptCompletion(new CancellationException()); + } + + @Override + public void attemptFailed(Throwable error, Duration delay) { + if (!applicationLatencyTimer.isRunning()) { + applicationLatencyTimer.start(); + } + recordAttemptCompletion(error); + } + + @Override + public void attemptFailedRetriesExhausted(Throwable error) { + super.attemptFailedRetriesExhausted(error); + } + + @Override + public void attemptPermanentFailure(Throwable error) { + super.attemptPermanentFailure(error); + } + + @Override + public void lroStartFailed(Throwable error) { + super.lroStartFailed(error); + } + + @Override + public void lroStartSucceeded() { + super.lroStartSucceeded(); + } + + @Override + public void onRequest() { + if (applicationLatencyTimer.isRunning()) { + totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS)); + applicationLatencyTimer.reset(); + } + } + + @Override + public void responseReceived() { + if (!applicationLatencyTimer.isRunning()) { + applicationLatencyTimer.start(); + } + if (firstResponsePerOpTimer.isRunning()) { + firstResponsePerOpTimer.stop(); + } + } + + @Override + public void requestSent() { + super.requestSent(); + } + + @Override + public void batchRequestSent(long elementCount, long requestSize) { + super.batchRequestSent(elementCount, requestSize); + } + + @Override + public int getAttempt() { + return attempt; + } + + @Override + public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) { + // Record the metrics and put in the map after the attempt is done, so we can have cluster and + // zone information + if (latency != null) { + recorder.recordGfeLatencies(latency); + } else { + gfeMissingHeaders.incrementAndGet(); + } + recorder.recordGfeMissingHeaders(gfeMissingHeaders.get()); + } + + @Override + public void setLocations(String zone, String cluster) { + this.zone = zone; + this.cluster = cluster; + } + + @Override + public void batchRequestThrottled(long throttledTimeMs) { + recorder.recordBatchRequestThrottled(throttledTimeMs, tableId, zone, cluster); + } + + private void recordOperationCompletion(@Nullable Throwable status) { + if (!opFinished.compareAndSet(false, true)) { + return; + } + operationTimer.stop(); + + recorder.recordRetryCount(attemptCount); + + if (applicationLatencyTimer.isRunning()) { + applicationLatencyTimer.stop(); + totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS)); + } + recorder.recordApplicationLatency(totalApplicationLatency.get(), tableId, zone, cluster); + + recorder.recordOperationLatencies(operationTimer.elapsed(TimeUnit.MILLISECONDS)); + + if (operationType == ApiTracerFactory.OperationType.ServerStreaming + && spanName.getMethodName().equals("ReadRows")) { + recorder.recordFirstResponseLatency(firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS)); + } + + recorder.recordOperationLevelWithoutStreaming( + Util.extractStatus(status), tableId, zone, cluster); + recorder.recordOperationLevelWithStreaming(Util.extractStatus(status), tableId, zone, cluster); + } + + private void recordAttemptCompletion(@Nullable Throwable status) { + recorder.recordAttemptLatency(attemptTimer.elapsed(TimeUnit.MILLISECONDS)); + + recorder.recordAttemptLevelWithoutStreaming(Util.extractStatus(status), tableId, zone, cluster); + recorder.recordAttemptLevelWithStreaming(Util.extractStatus(status), tableId, zone, cluster); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java new file mode 100644 index 0000000000..44671f6cea --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java @@ -0,0 +1,61 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.core.InternalApi; +import com.google.api.gax.tracing.ApiTracer; +import com.google.api.gax.tracing.BaseApiTracerFactory; +import com.google.api.gax.tracing.SpanName; +import com.google.cloud.bigtable.stats.BuiltinMetricsRecorder; +import com.google.cloud.bigtable.stats.StatsWrapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; + +@InternalApi("For internal use only") +public class BuiltinMetricsTracerFactory extends BaseApiTracerFactory { + + private final ImmutableMap statsAttributes; + private final StatsWrapper statsWrapper; + private final BuiltinMetricsRecorder builtinMetricsRecorder; + + public static BuiltinMetricsTracerFactory create( + StatsWrapper statsWrapper, ImmutableMap statsAttributes) { + return new BuiltinMetricsTracerFactory(statsWrapper, statsAttributes, null); + } + + @VisibleForTesting + static BuiltinMetricsTracerFactory create( + StatsWrapper statsWrapper, + ImmutableMap statsAttributes, + BuiltinMetricsRecorder recorder) { + return new BuiltinMetricsTracerFactory(statsWrapper, statsAttributes, recorder); + } + + private BuiltinMetricsTracerFactory( + StatsWrapper statsWrapper, + ImmutableMap statsAttributes, + BuiltinMetricsRecorder recorder) { + this.statsAttributes = statsAttributes; + this.statsWrapper = statsWrapper; + this.builtinMetricsRecorder = recorder; + } + + @Override + public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) { + return new BuiltinMetricsTracer( + operationType, spanName, statsAttributes, statsWrapper, builtinMetricsRecorder); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java index 5f4580743b..29c16baafb 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java @@ -92,9 +92,14 @@ public void connectionSelected(String id) { @Override public void attemptStarted(int attemptNumber) { + attemptStarted(null, attemptNumber); + } + + @Override + public void attemptStarted(Object request, int attemptNumber) { this.attempt = attemptNumber; for (ApiTracer child : children) { - child.attemptStarted(attemptNumber); + child.attemptStarted(request, attemptNumber); } } @@ -185,4 +190,18 @@ public void batchRequestThrottled(long throttledTimeMs) { tracer.batchRequestThrottled(throttledTimeMs); } } + + @Override + public void setLocations(String zone, String cluster) { + for (BigtableTracer tracer : bigtableTracers) { + tracer.setLocations(zone, cluster); + } + } + + @Override + public void onRequest() { + for (BigtableTracer tracer : bigtableTracers) { + tracer.onRequest(); + } + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java index f28b07c0cb..d27941bb71 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java @@ -118,7 +118,9 @@ private void recordOperationCompletion(@Nullable Throwable throwable) { TagContextBuilder tagCtx = newTagCtxBuilder() - .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable)); + .putLocal( + RpcMeasureConstants.BIGTABLE_STATUS, + TagValue.create(Util.extractStatus(throwable))); measures.record(tagCtx.build()); } @@ -171,7 +173,9 @@ private void recordAttemptCompletion(@Nullable Throwable throwable) { TagContextBuilder tagCtx = newTagCtxBuilder() - .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable)); + .putLocal( + RpcMeasureConstants.BIGTABLE_STATUS, + TagValue.create(Util.extractStatus(throwable))); measures.record(tagCtx.build()); } @@ -222,7 +226,8 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa } measures.record( newTagCtxBuilder() - .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable)) + .putLocal( + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create(Util.extractStatus(throwable))) .build()); } @@ -248,4 +253,14 @@ private TagContextBuilder newTagCtxBuilder() { return tagCtx; } + + @Override + public void setLocations(String zone, String cluster) { + // noop + } + + @Override + public void onRequest() { + // noop + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java index 00995b717a..bfd64ee958 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java @@ -15,10 +15,19 @@ */ package com.google.cloud.bigtable.data.v2.stub.metrics; +import com.google.api.core.InternalApi; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.StatusCode.Code; +import com.google.bigtable.v2.CheckAndMutateRowRequest; +import com.google.bigtable.v2.MutateRowRequest; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.ReadModifyWriteRowRequest; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.SampleRowKeysRequest; +import com.google.bigtable.v2.TableName; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import io.grpc.Metadata; import io.grpc.Status; @@ -38,7 +47,8 @@ import javax.annotation.Nullable; /** Utilities to help integrating with OpenCensus. */ -class Util { +@InternalApi("For internal use only") +public class Util { static final Metadata.Key ATTEMPT_HEADER_KEY = Metadata.Key.of("bigtable-attempt", Metadata.ASCII_STRING_MARSHALLER); static final Metadata.Key ATTEMPT_EPOCH_KEY = @@ -48,14 +58,14 @@ class Util { Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER); private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?\\d+)"); - private static final TagValue OK_STATUS = TagValue.create(StatusCode.Code.OK.toString()); + static final String TRAILER_KEY = "x-goog-ext-425905942-bin"; - /** Convert an exception into a value that can be used as an OpenCensus tag value. */ - static TagValue extractStatus(@Nullable Throwable error) { + /** Convert an exception into a value that can be used to create an OpenCensus tag value. */ + static String extractStatus(@Nullable Throwable error) { final String statusString; if (error == null) { - return OK_STATUS; + return StatusCode.Code.OK.toString(); } else if (error instanceof CancellationException) { statusString = Status.Code.CANCELLED.toString(); } else if (error instanceof ApiException) { @@ -68,14 +78,14 @@ static TagValue extractStatus(@Nullable Throwable error) { statusString = Code.UNKNOWN.toString(); } - return TagValue.create(statusString); + return statusString; } /** * Await the result of the future and convert it into a value that can be used as an OpenCensus * tag value. */ - static TagValue extractStatus(Future future) { + static TagValue extractStatusAsync(Future future) { Throwable error = null; try { @@ -88,7 +98,25 @@ static TagValue extractStatus(Future future) { } catch (RuntimeException e) { error = e; } - return extractStatus(error); + return TagValue.create(extractStatus(error)); + } + + static String extractTableId(Object request) { + String tableName = null; + if (request instanceof ReadRowsRequest) { + tableName = ((ReadRowsRequest) request).getTableName(); + } else if (request instanceof MutateRowsRequest) { + tableName = ((MutateRowsRequest) request).getTableName(); + } else if (request instanceof MutateRowRequest) { + tableName = ((MutateRowRequest) request).getTableName(); + } else if (request instanceof SampleRowKeysRequest) { + tableName = ((SampleRowKeysRequest) request).getTableName(); + } else if (request instanceof CheckAndMutateRowRequest) { + tableName = ((CheckAndMutateRowRequest) request).getTableName(); + } else if (request instanceof ReadModifyWriteRowRequest) { + tableName = ((ReadModifyWriteRowRequest) request).getTableName(); + } + return !Strings.isNullOrEmpty(tableName) ? TableName.parse(tableName).getTable() : "undefined"; } /** diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java similarity index 95% rename from google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerCallableTest.java rename to google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java index 03aad7f822..eba8411189 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java @@ -55,7 +55,6 @@ import io.grpc.stub.StreamObserver; import io.opencensus.impl.stats.StatsComponentImpl; import io.opencensus.stats.StatsComponent; -import io.opencensus.stats.ViewData; import io.opencensus.tags.TagKey; import io.opencensus.tags.TagValue; import io.opencensus.tags.Tags; @@ -68,7 +67,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class HeaderTracerCallableTest { +public class BigtableTracerCallableTest { private FakeServiceHelper serviceHelper; private FakeServiceHelper serviceHelperNoHeader; @@ -383,24 +382,6 @@ public void testMetricsWithErrorResponse() throws InterruptedException { assertThat(missingCount).isEqualTo(attempts); } - @Test - public void testCallableBypassed() throws InterruptedException { - RpcViews.setGfeMetricsRegistered(false); - stub.readRowsCallable().call(Query.create(TABLE_ID)); - Thread.sleep(WAIT_FOR_METRICS_TIME_MS); - ViewData headerMissingView = - localStats - .getViewManager() - .getView(RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW.getName()); - ViewData latencyView = - localStats.getViewManager().getView(RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW.getName()); - // Verify that the view is registered by it's not collecting metrics - assertThat(headerMissingView).isNotNull(); - assertThat(latencyView).isNotNull(); - assertThat(headerMissingView.getAggregationMap()).isEmpty(); - assertThat(latencyView.getAggregationMap()).isEmpty(); - } - private class FakeService extends BigtableImplBase { private final String defaultTableName = NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java new file mode 100644 index 0000000000..06e756ba5a --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java @@ -0,0 +1,389 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.api.client.util.Lists; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.StreamController; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.MutateRowRequest; +import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.FakeServiceHelper; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; +import com.google.cloud.bigtable.stats.BuiltinMetricsRecorder; +import com.google.cloud.bigtable.stats.StatsWrapper; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.StringValue; +import io.grpc.ForwardingServerCall; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.threeten.bp.Duration; + +public class BuiltinMetricsTracerTest { + private static final String PROJECT_ID = "fake-project"; + private static final String INSTANCE_ID = "fake-instance"; + private static final String APP_PROFILE_ID = "default"; + private static final String TABLE_ID = "fake-table"; + private static final String ZONE_0 = "us-west-1"; + private static final String CLUSTER_0 = "cluster-0"; + private static final String ZONE_1 = "us-east-1"; + private static final String CLUSTER_1 = "cluster-1"; + private static final String UNDEFINED = "undefined"; + private static final long FAKE_SERVER_TIMING = 50; + private static final long SERVER_LATENCY = 500; + + private final AtomicInteger rpcCount = new AtomicInteger(0); + + private static final ReadRowsResponse READ_ROWS_RESPONSE_1 = + ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("fake-key-1")) + .setFamilyName(StringValue.of("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(1_000) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build(); + private static final ReadRowsResponse READ_ROWS_RESPONSE_2 = + ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("fake-key-2")) + .setFamilyName(StringValue.of("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(1_000) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build(); + private static final ReadRowsResponse READ_ROWS_RESPONSE_3 = + ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("fake-key-3")) + .setFamilyName(StringValue.of("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(1_000) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build(); + + @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + private FakeServiceHelper serviceHelper; + + private BigtableGrpc.BigtableImplBase mockService; + + private EnhancedBigtableStub stub; + + private BuiltinMetricsRecorder builtinMetricsRecorder; + private ArgumentCaptor longValue; + private ArgumentCaptor intValue; + private ArgumentCaptor status; + private ArgumentCaptor tableId; + private ArgumentCaptor zone; + private ArgumentCaptor cluster; + + private Stopwatch serverRetryDelayStopwatch; + private AtomicLong serverTotalRetryDelay; + + @Before + public void setUp() throws Exception { + mockService = new FakeService(); + + serverRetryDelayStopwatch = Stopwatch.createUnstarted(); + serverTotalRetryDelay = new AtomicLong(0); + + // Add an interceptor to send location information in the trailers and add server-timing in + // headers + ServerInterceptor trailersInterceptor = + new ServerInterceptor() { + private AtomicInteger count = new AtomicInteger(0); + + @Override + public ServerCall.Listener interceptCall( + ServerCall serverCall, + Metadata metadata, + ServerCallHandler serverCallHandler) { + return serverCallHandler.startCall( + new ForwardingServerCall.SimpleForwardingServerCall(serverCall) { + @Override + public void sendHeaders(Metadata headers) { + headers.put( + Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER), + String.format("gfet4t7; dur=%d", FAKE_SERVER_TIMING)); + super.sendHeaders(headers); + } + + @Override + public void close(Status status, Metadata trailers) { + // int currentCount = count.getAndIncrement(); + // if (currentCount == 0) { + // ResponseParams params = + // ResponseParams.newBuilder() + // .setZoneId(ZONE_0) + // .setClusterId(CLUSTER_0) + // .build(); + // byte[] byteArray = params.toByteArray(); + // trailers.put( + // Metadata.Key.of(Util.TRAILER_KEY, + // Metadata.BINARY_BYTE_MARSHALLER), + // byteArray); + // } else { + // ResponseParams params = + // ResponseParams.newBuilder() + // .setClusterId(CLUSTER_1) + // .setZoneId(ZONE_1) + // .build(); + // byte[] byteArray = params.toByteArray(); + // trailers.put( + // Metadata.Key.of(Util.TRAILER_KEY, + // Metadata.BINARY_BYTE_MARSHALLER), + // byteArray); + // } + super.close(status, trailers); + } + }, + metadata); + } + }; + + serviceHelper = new FakeServiceHelper(trailersInterceptor, mockService); + + builtinMetricsRecorder = Mockito.mock(BuiltinMetricsRecorder.class); + + BigtableDataSettings settings = + BigtableDataSettings.newBuilderForEmulator(serviceHelper.getPort()) + .setProjectId(PROJECT_ID) + .setInstanceId(INSTANCE_ID) + .setAppProfileId(APP_PROFILE_ID) + .build(); + EnhancedBigtableStubSettings.Builder stubSettingsBuilder = + settings.getStubSettings().toBuilder(); + stubSettingsBuilder + .mutateRowSettings() + .retrySettings() + .setInitialRetryDelay(Duration.ofMillis(200)); + stubSettingsBuilder.setTracerFactory( + BuiltinMetricsTracerFactory.create( + new StatsWrapper(false), ImmutableMap.of(), builtinMetricsRecorder)); + + EnhancedBigtableStubSettings stubSettings = stubSettingsBuilder.build(); + stub = new EnhancedBigtableStub(stubSettings, ClientContext.create(stubSettings)); + + serviceHelper.start(); + + longValue = ArgumentCaptor.forClass(long.class); + intValue = ArgumentCaptor.forClass(int.class); + status = ArgumentCaptor.forClass(String.class); + tableId = ArgumentCaptor.forClass(String.class); + zone = ArgumentCaptor.forClass(String.class); + cluster = ArgumentCaptor.forClass(String.class); + } + + @After + public void tearDown() { + stub.close(); + serviceHelper.shutdown(); + } + + @Test + public void testOperationLatencies() { + Stopwatch stopwatch = Stopwatch.createStarted(); + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)).iterator()); + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + verify(builtinMetricsRecorder).recordOperationLatencies(longValue.capture()); + + assertThat(longValue.getValue()).isIn(Range.closed(SERVER_LATENCY, elapsed)); + } + + @Test + public void testGfeMetrics() { + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID))); + + verify(builtinMetricsRecorder).recordGfeLatencies(longValue.capture()); + assertThat(longValue.getValue()).isEqualTo(FAKE_SERVER_TIMING); + + verify(builtinMetricsRecorder).recordGfeMissingHeaders(longValue.capture()); + assertThat(longValue.getValue()).isEqualTo(0); + } + + @Test + public void testReadRowsApplicationLatency() throws Exception { + final long applicationLatency = 1000; + final SettableApiFuture future = SettableApiFuture.create(); + final AtomicInteger counter = new AtomicInteger(0); + // We want to measure how long application waited before requesting another message after + // the previous message is returned from the server. Using ResponseObserver here so that the + // flow will be + // onResponse() -> sleep -> onRequest() (for the next message) which is exactly what we want to + // measure for + // application latency. + // If we do readRowsCallable().call(Query.create(TABLE_ID)).iterator() and iterate through the + // iterator and sleep in + // between responses, when the call started, the client will pre-fetch the first response, which + // won't be counted + // in application latency. So the test will be flaky and hard to debug. + stub.readRowsCallable() + .call( + Query.create(TABLE_ID), + new ResponseObserver() { + @Override + public void onStart(StreamController streamController) {} + + @Override + public void onResponse(Row row) { + try { + counter.incrementAndGet(); + Thread.sleep(applicationLatency); + } catch (InterruptedException e) { + } + } + + @Override + public void onError(Throwable throwable) { + future.setException(throwable); + } + + @Override + public void onComplete() { + future.set(null); + } + }); + future.get(); + + verify(builtinMetricsRecorder) + .recordApplicationLatency( + longValue.capture(), tableId.capture(), zone.capture(), cluster.capture()); + + assertThat(counter.get()).isGreaterThan(0); + assertThat(longValue.getValue()).isAtLeast(applicationLatency * counter.get()); + assertThat(longValue.getValue()).isLessThan(applicationLatency * (counter.get() + 1)); + } + + @Test + public void testMutateRowApplicationLatency() { + // Unary callable application latency is the delay between retries + stub.mutateRowCallable() + .call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value")); + + verify(builtinMetricsRecorder) + .recordApplicationLatency( + longValue.capture(), tableId.capture(), zone.capture(), cluster.capture()); + + // Application latency should be slightly less than the total delay between 2 requests observed + // from the server side. To make + // the test less flaky comparing with half of the server side delay here. + assertThat(longValue.getValue()).isAtLeast(serverTotalRetryDelay.get() / 2); + assertThat(longValue.getValue()).isAtMost(serverTotalRetryDelay.get()); + } + + @Test + public void testRetryCount() { + stub.mutateRowCallable() + .call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value")); + + verify(builtinMetricsRecorder).recordRetryCount(intValue.capture()); + + assertThat(intValue.getValue()).isEqualTo(3); + } + + @Test + public void testMutateRowAttempts() { + stub.mutateRowCallable() + .call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value")); + + verify(builtinMetricsRecorder, times(3)) + .recordAttemptLevelWithStreaming( + status.capture(), tableId.capture(), zone.capture(), cluster.capture()); + assertThat(zone.getAllValues().get(0)).isEqualTo(UNDEFINED); + assertThat(zone.getAllValues().get(1)).isEqualTo(UNDEFINED); + assertThat(zone.getAllValues().get(2)).isEqualTo(UNDEFINED); + assertThat(cluster.getAllValues().get(0)).isEqualTo(UNDEFINED); + assertThat(cluster.getAllValues().get(1)).isEqualTo(UNDEFINED); + assertThat(cluster.getAllValues().get(2)).isEqualTo(UNDEFINED); + assertThat(status.getAllValues().get(0)).isEqualTo("UNAVAILABLE"); + assertThat(status.getAllValues().get(1)).isEqualTo("UNAVAILABLE"); + assertThat(status.getAllValues().get(2)).isEqualTo("OK"); + } + + private class FakeService extends BigtableGrpc.BigtableImplBase { + + @Override + public void readRows( + ReadRowsRequest request, StreamObserver responseObserver) { + try { + Thread.sleep(SERVER_LATENCY); + } catch (InterruptedException e) { + } + responseObserver.onNext(READ_ROWS_RESPONSE_1); + responseObserver.onNext(READ_ROWS_RESPONSE_2); + responseObserver.onNext(READ_ROWS_RESPONSE_3); + responseObserver.onCompleted(); + } + + @Override + public void mutateRow( + MutateRowRequest request, StreamObserver responseObserver) { + if (serverRetryDelayStopwatch.isRunning()) { + serverTotalRetryDelay.addAndGet(serverRetryDelayStopwatch.elapsed(TimeUnit.MILLISECONDS)); + serverRetryDelayStopwatch.reset(); + } + if (rpcCount.get() < 2) { + responseObserver.onError(new StatusRuntimeException(Status.UNAVAILABLE)); + rpcCount.getAndIncrement(); + serverRetryDelayStopwatch.start(); + return; + } + responseObserver.onNext(MutateRowResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java index 69a741d0e3..0de14636c6 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java @@ -23,6 +23,7 @@ import com.google.api.gax.tracing.ApiTracer; import com.google.api.gax.tracing.ApiTracer.Scope; +import com.google.bigtable.v2.ReadRowsRequest; import com.google.cloud.bigtable.misc_utilities.MethodComparator; import com.google.common.collect.ImmutableList; import io.grpc.Status; @@ -118,11 +119,12 @@ public void testConnectionSelected() { @Test public void testAttemptStarted() { - compositeTracer.attemptStarted(3); - verify(child1, times(1)).attemptStarted(3); - verify(child2, times(1)).attemptStarted(3); - verify(child3, times(1)).attemptStarted(3); - verify(child4, times(1)).attemptStarted(3); + ReadRowsRequest request = ReadRowsRequest.getDefaultInstance(); + compositeTracer.attemptStarted(request, 3); + verify(child1, times(1)).attemptStarted(request, 3); + verify(child2, times(1)).attemptStarted(request, 3); + verify(child3, times(1)).attemptStarted(request, 3); + verify(child4, times(1)).attemptStarted(request, 3); } @Test diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java index efef3b67d2..134a7604de 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java @@ -30,13 +30,13 @@ public class UtilTest { @Test public void testOk() { - TagValue tagValue = Util.extractStatus((Throwable) null); + TagValue tagValue = TagValue.create(Util.extractStatus((Throwable) null)); assertThat(tagValue.asString()).isEqualTo("OK"); } @Test public void testOkFuture() { - TagValue tagValue = Util.extractStatus(Futures.immediateFuture(null)); + TagValue tagValue = Util.extractStatusAsync(Futures.immediateFuture(null)); assertThat(tagValue.asString()).isEqualTo("OK"); } @@ -45,7 +45,7 @@ public void testError() { DeadlineExceededException error = new DeadlineExceededException( "Deadline exceeded", null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true); - TagValue tagValue = Util.extractStatus(error); + TagValue tagValue = TagValue.create(Util.extractStatus(error)); assertThat(tagValue.asString()).isEqualTo("DEADLINE_EXCEEDED"); } @@ -54,13 +54,13 @@ public void testErrorFuture() { DeadlineExceededException error = new DeadlineExceededException( "Deadline exceeded", null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true); - TagValue tagValue = Util.extractStatus(Futures.immediateFailedFuture(error)); + TagValue tagValue = Util.extractStatusAsync(Futures.immediateFailedFuture(error)); assertThat(tagValue.asString()).isEqualTo("DEADLINE_EXCEEDED"); } @Test public void testCancelledFuture() { - TagValue tagValue = Util.extractStatus(Futures.immediateCancelledFuture()); + TagValue tagValue = Util.extractStatusAsync(Futures.immediateCancelledFuture()); assertThat(tagValue.asString()).isEqualTo("CANCELLED"); } } diff --git a/pom.xml b/pom.xml index dbfddb8ee7..57fffc3e4b 100644 --- a/pom.xml +++ b/pom.xml @@ -212,6 +212,15 @@ + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + +