From 38b28044375b47b0b25406dc319f4a544e9d37a9 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 10 May 2022 11:49:36 -0400 Subject: [PATCH] feat: update tracers to use built in metrics --- google-cloud-bigtable/pom.xml | 9 + .../data/v2/stub/EnhancedBigtableStub.java | 18 +- .../data/v2/stub/metrics/BigtableTracer.java | 12 +- .../v2/stub/metrics/BuiltinMetricsTracer.java | 244 +++++++++++++ .../metrics/BuiltinMetricsTracerFactory.java | 66 ++++ .../data/v2/stub/metrics/CompositeTracer.java | 21 +- .../data/v2/stub/metrics/MetricsTracer.java | 21 +- .../bigtable/data/v2/stub/metrics/Util.java | 44 ++- .../metrics/BuiltinMetricsTracerTest.java | 341 ++++++++++++++++++ .../v2/stub/metrics/CompositeTracerTest.java | 12 +- .../data/v2/stub/metrics/UtilTest.java | 10 +- pom.xml | 9 + 12 files changed, 780 insertions(+), 27 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java diff --git a/google-cloud-bigtable/pom.xml b/google-cloud-bigtable/pom.xml index 7676d0612f..9ac8711c90 100644 --- a/google-cloud-bigtable/pom.xml +++ b/google-cloud-bigtable/pom.xml @@ -54,6 +54,11 @@ pom import + + com.google.cloud + google-cloud-bigtable-stats + 2.6.3-SNAPSHOT + @@ -61,6 +66,10 @@ + + com.google.cloud + google-cloud-bigtable-stats + com.google.api diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index d8daaa80e6..4087dc59b4 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -70,6 +70,7 @@ import com.google.cloud.bigtable.data.v2.models.RowAdapter; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory; import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory; import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable; @@ -89,6 +90,7 @@ import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable; import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm; +import com.google.cloud.bigtable.stats.StatsWrapper; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -152,6 +154,7 @@ public static EnhancedBigtableStubSettings finalizeSettings( EnhancedBigtableStubSettings settings, Tagger tagger, StatsRecorder stats) throws IOException { EnhancedBigtableStubSettings.Builder builder = settings.toBuilder(); + StatsWrapper statsWrapper = StatsWrapper.get(); // TODO: this implementation is on the cusp of unwieldy, if we end up adding more features // consider splitting it up by feature. @@ -194,6 +197,12 @@ public static EnhancedBigtableStubSettings finalizeSettings( RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, TagValue.create(settings.getAppProfileId())) .build(); + ImmutableMap builtinAttributes = + ImmutableMap.builder() + .put("project_id", settings.getProjectId()) + .put("instance_id", settings.getInstanceId()) + .put("app_profile", settings.getAppProfileId()) + .build(); // Inject Opencensus instrumentation builder.setTracerFactory( new CompositeTracerFactory( @@ -218,6 +227,7 @@ public static EnhancedBigtableStubSettings finalizeSettings( .build()), // Add OpenCensus Metrics MetricsTracerFactory.create(tagger, stats, attributes), + BuiltinMetricsTracerFactory.create(statsWrapper, builtinAttributes), // Add user configured tracer settings.getTracerFactory()))); return builder.build(); @@ -466,7 +476,7 @@ private UnaryCallable> createBulkReadRowsCallable( new TracedBatcherUnaryCallable<>(readRowsUserCallable.all()); UnaryCallable> withHeaderTracer = - new HeaderTracerUnaryCallable(tracedBatcher); + new HeaderTracerUnaryCallable<>(tracedBatcher); UnaryCallable> traced = new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), span); @@ -594,11 +604,11 @@ private UnaryCallable createBulkMutateRowsCallable() { SpanName spanName = getSpanName("MutateRows"); - UnaryCallable tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing); + UnaryCallable tracedBatcherUnaryCallable = + new TracedBatcherUnaryCallable<>(userFacing); UnaryCallable withHeaderTracer = - new HeaderTracerUnaryCallable<>(tracedBatcher); - + new HeaderTracerUnaryCallable<>(tracedBatcherUnaryCallable); UnaryCallable traced = new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), spanName); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java index 3d7707cc4c..421c5b4724 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java @@ -25,7 +25,7 @@ * A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base * implementation that does nothing. */ -@BetaApi("This surface is stable yet it might be removed in the future.") +@BetaApi("This surface is not stable yet it might be removed in the future.") public class BigtableTracer extends BaseApiTracer { private volatile int attempt = 0; @@ -35,6 +35,11 @@ public void attemptStarted(int attemptNumber) { this.attempt = attemptNumber; } + /** annotate when onRequest is called */ + public void onRequest() { + // noop + } + /** * Get the attempt number of the current call. Attempt number for the current call is passed in * and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from @@ -57,4 +62,9 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa public void batchRequestThrottled(long throttledTimeMs) { // noop } + + /** Set the Bigtable zone and cluster so metrics can be tagged with location information. */ + public void setLocations(String zone, String cluster) { + // noop + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java new file mode 100644 index 0000000000..445585ede4 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -0,0 +1,244 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.gax.tracing.ApiTracerFactory; +import com.google.api.gax.tracing.SpanName; +import com.google.cloud.bigtable.stats.StatsRecorderWrapper; +import com.google.cloud.bigtable.stats.StatsWrapper; +import com.google.common.base.Stopwatch; +import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; +import org.threeten.bp.Duration; + +/** + * A {@link BigtableTracer} that records built-in metrics and publish under the + * bigtable.googleapis.com/client namespace + */ +class BuiltinMetricsTracer extends BigtableTracer { + + private StatsRecorderWrapper recorder; + + private final ApiTracerFactory.OperationType operationType; + private final SpanName spanName; + + // Operation level metrics + private final AtomicBoolean opFinished = new AtomicBoolean(); + private final Stopwatch operationTimer = Stopwatch.createStarted(); + private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted(); + + // Attempt level metrics + private int attemptCount = 0; + private Stopwatch attemptTimer; + private volatile int attempt = 0; + + // Total application latency + private final Stopwatch applicationLatencyTimer = Stopwatch.createUnstarted(); + private final AtomicLong totalApplicationLatency = new AtomicLong(0); + + // Monitored resource labels + private String tableId = "undefined"; + private String zone = "undefined"; + private String cluster = "undefined"; + + // gfe stats + private AtomicLong gfeMissingHeaders = new AtomicLong(0); + + BuiltinMetricsTracer( + ApiTracerFactory.OperationType operationType, + SpanName spanName, + Map attributes, + StatsWrapper statsWrapper, + StatsRecorderWrapper recorder) { + this.operationType = operationType; + this.spanName = spanName; + if (recorder != null) { + this.recorder = recorder; + } else { + this.recorder = new StatsRecorderWrapper(operationType, spanName, attributes, statsWrapper); + } + } + + @Override + public Scope inScope() { + return new Scope() { + @Override + public void close() {} + }; + } + + @Override + public void operationSucceeded() { + recordOperationCompletion(null); + } + + @Override + public void operationCancelled() { + recordOperationCompletion(new CancellationException()); + } + + @Override + public void operationFailed(Throwable error) { + recordOperationCompletion(error); + } + + @Override + public void attemptStarted(int attemptNumber) { + attemptStarted(null, attemptNumber); + } + + @Override + public void attemptStarted(Object request, int attemptNumber) { + this.attempt = attemptNumber; + attemptCount++; + attemptTimer = Stopwatch.createStarted(); + if (request != null) { + this.tableId = Util.extractTableId(request); + } + if (applicationLatencyTimer.isRunning()) { + totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS)); + applicationLatencyTimer.reset(); + } + } + + @Override + public void attemptSucceeded() { + recordAttemptCompletion(null); + } + + @Override + public void attemptCancelled() { + recordAttemptCompletion(new CancellationException()); + } + + @Override + public void attemptFailed(Throwable error, Duration delay) { + if (!applicationLatencyTimer.isRunning()) { + applicationLatencyTimer.start(); + } + recordAttemptCompletion(error); + } + + @Override + public void attemptFailedRetriesExhausted(Throwable error) { + super.attemptFailedRetriesExhausted(error); + } + + @Override + public void attemptPermanentFailure(Throwable error) { + super.attemptPermanentFailure(error); + } + + @Override + public void lroStartFailed(Throwable error) { + super.lroStartFailed(error); + } + + @Override + public void lroStartSucceeded() { + super.lroStartSucceeded(); + } + + @Override + public void onRequest() { + if (applicationLatencyTimer.isRunning()) { + totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS)); + applicationLatencyTimer.reset(); + } + } + + @Override + public void responseReceived() { + if (!applicationLatencyTimer.isRunning()) { + applicationLatencyTimer.start(); + } + if (firstResponsePerOpTimer.isRunning()) { + firstResponsePerOpTimer.stop(); + } + } + + @Override + public void requestSent() { + super.requestSent(); + } + + @Override + public void batchRequestSent(long elementCount, long requestSize) { + super.batchRequestSent(elementCount, requestSize); + } + + @Override + public int getAttempt() { + return attempt; + } + + @Override + public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) { + // Record the metrics and put in the map after the attempt is done, so we can have cluster and + // zone information + if (latency != null) { + recorder.putGfeLatencies(latency); + } else { + gfeMissingHeaders.incrementAndGet(); + } + recorder.putGfeMissingHeaders(gfeMissingHeaders.get()); + } + + @Override + public void setLocations(String zone, String cluster) { + this.zone = zone; + this.cluster = cluster; + } + + @Override + public void batchRequestThrottled(long throttledTimeMs) { + recorder.putBatchRequestThrottled(throttledTimeMs); + } + + private void recordOperationCompletion(@Nullable Throwable status) { + if (!opFinished.compareAndSet(false, true)) { + return; + } + operationTimer.stop(); + + recorder.putRetryCount(attemptCount); + + if (applicationLatencyTimer.isRunning()) { + applicationLatencyTimer.stop(); + totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS)); + } + recorder.putApplicationLatencies(totalApplicationLatency.get()); + + recorder.putOperationLatencies(operationTimer.elapsed(TimeUnit.MILLISECONDS)); + + if (operationType == ApiTracerFactory.OperationType.ServerStreaming + && spanName.getMethodName().equals("ReadRows")) { + recorder.putFirstResponseLatencies(firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS)); + } + + recorder.record(Util.extractStatus(status), tableId, zone, cluster); + } + + private void recordAttemptCompletion(@Nullable Throwable status) { + recorder.putAttemptLatencies(attemptTimer.elapsed(TimeUnit.MILLISECONDS)); + + recorder.record(Util.extractStatus(status), tableId, zone, cluster); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java new file mode 100644 index 0000000000..3583733a45 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java @@ -0,0 +1,66 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.core.InternalApi; +import com.google.api.gax.tracing.ApiTracer; +import com.google.api.gax.tracing.ApiTracerFactory; +import com.google.api.gax.tracing.BaseApiTracerFactory; +import com.google.api.gax.tracing.SpanName; +import com.google.cloud.bigtable.stats.StatsRecorderWrapper; +import com.google.cloud.bigtable.stats.StatsWrapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; + +/** + * {@link ApiTracerFactory} that will generate OpenCensus metrics by using the {@link ApiTracer} + * api. + */ +@InternalApi("For internal use only") +public class BuiltinMetricsTracerFactory extends BaseApiTracerFactory { + + private final ImmutableMap statsAttributes; + private final StatsWrapper statsWrapper; + private final StatsRecorderWrapper statsRecorderWrapper; + + public static BuiltinMetricsTracerFactory create( + StatsWrapper statsWrapper, ImmutableMap statsAttributes) { + return new BuiltinMetricsTracerFactory(statsWrapper, statsAttributes, null); + } + + @VisibleForTesting + static BuiltinMetricsTracerFactory create( + StatsWrapper statsWrapper, + ImmutableMap statsAttributes, + StatsRecorderWrapper recorder) { + return new BuiltinMetricsTracerFactory(statsWrapper, statsAttributes, recorder); + } + + private BuiltinMetricsTracerFactory( + StatsWrapper statsWrapper, + ImmutableMap statsAttributes, + StatsRecorderWrapper recorder) { + this.statsAttributes = statsAttributes; + this.statsWrapper = statsWrapper; + this.statsRecorderWrapper = recorder; + } + + @Override + public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) { + return new BuiltinMetricsTracer( + operationType, spanName, statsAttributes, statsWrapper, statsRecorderWrapper); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java index 5f4580743b..29c16baafb 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java @@ -92,9 +92,14 @@ public void connectionSelected(String id) { @Override public void attemptStarted(int attemptNumber) { + attemptStarted(null, attemptNumber); + } + + @Override + public void attemptStarted(Object request, int attemptNumber) { this.attempt = attemptNumber; for (ApiTracer child : children) { - child.attemptStarted(attemptNumber); + child.attemptStarted(request, attemptNumber); } } @@ -185,4 +190,18 @@ public void batchRequestThrottled(long throttledTimeMs) { tracer.batchRequestThrottled(throttledTimeMs); } } + + @Override + public void setLocations(String zone, String cluster) { + for (BigtableTracer tracer : bigtableTracers) { + tracer.setLocations(zone, cluster); + } + } + + @Override + public void onRequest() { + for (BigtableTracer tracer : bigtableTracers) { + tracer.onRequest(); + } + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java index f28b07c0cb..d27941bb71 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java @@ -118,7 +118,9 @@ private void recordOperationCompletion(@Nullable Throwable throwable) { TagContextBuilder tagCtx = newTagCtxBuilder() - .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable)); + .putLocal( + RpcMeasureConstants.BIGTABLE_STATUS, + TagValue.create(Util.extractStatus(throwable))); measures.record(tagCtx.build()); } @@ -171,7 +173,9 @@ private void recordAttemptCompletion(@Nullable Throwable throwable) { TagContextBuilder tagCtx = newTagCtxBuilder() - .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable)); + .putLocal( + RpcMeasureConstants.BIGTABLE_STATUS, + TagValue.create(Util.extractStatus(throwable))); measures.record(tagCtx.build()); } @@ -222,7 +226,8 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa } measures.record( newTagCtxBuilder() - .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable)) + .putLocal( + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create(Util.extractStatus(throwable))) .build()); } @@ -248,4 +253,14 @@ private TagContextBuilder newTagCtxBuilder() { return tagCtx; } + + @Override + public void setLocations(String zone, String cluster) { + // noop + } + + @Override + public void onRequest() { + // noop + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java index 00995b717a..0440029027 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java @@ -15,10 +15,19 @@ */ package com.google.cloud.bigtable.data.v2.stub.metrics; +import com.google.api.core.InternalApi; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.StatusCode.Code; +import com.google.bigtable.v2.CheckAndMutateRowRequest; +import com.google.bigtable.v2.MutateRowRequest; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.ReadModifyWriteRowRequest; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.SampleRowKeysRequest; +import com.google.bigtable.v2.TableName; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import io.grpc.Metadata; import io.grpc.Status; @@ -38,7 +47,8 @@ import javax.annotation.Nullable; /** Utilities to help integrating with OpenCensus. */ -class Util { +@InternalApi("For internal use only") +public class Util { static final Metadata.Key ATTEMPT_HEADER_KEY = Metadata.Key.of("bigtable-attempt", Metadata.ASCII_STRING_MARSHALLER); static final Metadata.Key ATTEMPT_EPOCH_KEY = @@ -48,14 +58,14 @@ class Util { Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER); private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?\\d+)"); - private static final TagValue OK_STATUS = TagValue.create(StatusCode.Code.OK.toString()); + static final String TRAILER_KEY = "x-goog-ext-425905942-bin"; - /** Convert an exception into a value that can be used as an OpenCensus tag value. */ - static TagValue extractStatus(@Nullable Throwable error) { + /** Convert an exception into a value that can be used to create an OpenCensus tag value. */ + static String extractStatus(@Nullable Throwable error) { final String statusString; if (error == null) { - return OK_STATUS; + return StatusCode.Code.OK.toString(); } else if (error instanceof CancellationException) { statusString = Status.Code.CANCELLED.toString(); } else if (error instanceof ApiException) { @@ -68,14 +78,14 @@ static TagValue extractStatus(@Nullable Throwable error) { statusString = Code.UNKNOWN.toString(); } - return TagValue.create(statusString); + return statusString; } /** * Await the result of the future and convert it into a value that can be used as an OpenCensus * tag value. */ - static TagValue extractStatus(Future future) { + static TagValue extractStatusFromFuture(Future future) { Throwable error = null; try { @@ -88,7 +98,25 @@ static TagValue extractStatus(Future future) { } catch (RuntimeException e) { error = e; } - return extractStatus(error); + return TagValue.create(extractStatus(error)); + } + + static String extractTableId(Object request) { + String tableName = null; + if (request instanceof ReadRowsRequest) { + tableName = ((ReadRowsRequest) request).getTableName(); + } else if (request instanceof MutateRowsRequest) { + tableName = ((MutateRowsRequest) request).getTableName(); + } else if (request instanceof MutateRowRequest) { + tableName = ((MutateRowRequest) request).getTableName(); + } else if (request instanceof SampleRowKeysRequest) { + tableName = ((SampleRowKeysRequest) request).getTableName(); + } else if (request instanceof CheckAndMutateRowRequest) { + tableName = ((CheckAndMutateRowRequest) request).getTableName(); + } else if (request instanceof ReadModifyWriteRowRequest) { + tableName = ((ReadModifyWriteRowRequest) request).getTableName(); + } + return !Strings.isNullOrEmpty(tableName) ? TableName.parse(tableName).getTable() : "undefined"; } /** diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java new file mode 100644 index 0000000000..3a6f64a124 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java @@ -0,0 +1,341 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.api.client.util.Lists; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.StreamController; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.MutateRowRequest; +import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.FakeServiceHelper; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; +import com.google.cloud.bigtable.stats.StatsRecorderWrapper; +import com.google.cloud.bigtable.stats.StatsWrapper; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.StringValue; +import io.grpc.ForwardingServerCall; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.threeten.bp.Duration; + +public class BuiltinMetricsTracerTest { + private static final String PROJECT_ID = "fake-project"; + private static final String INSTANCE_ID = "fake-instance"; + private static final String APP_PROFILE_ID = "default"; + private static final String TABLE_ID = "fake-table"; + private static final String UNDEFINED = "undefined"; + private static final long FAKE_SERVER_TIMING = 50; + private static final long SERVER_LATENCY = 500; + + private final AtomicInteger rpcCount = new AtomicInteger(0); + + private static final ReadRowsResponse READ_ROWS_RESPONSE_1 = + ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("fake-key-1")) + .setFamilyName(StringValue.of("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(1_000) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build(); + private static final ReadRowsResponse READ_ROWS_RESPONSE_2 = + ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("fake-key-2")) + .setFamilyName(StringValue.of("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(1_000) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build(); + private static final ReadRowsResponse READ_ROWS_RESPONSE_3 = + ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("fake-key-3")) + .setFamilyName(StringValue.of("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(1_000) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build(); + + @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + private FakeServiceHelper serviceHelper; + + private BigtableGrpc.BigtableImplBase mockService; + + private EnhancedBigtableStub stub; + + private StatsRecorderWrapper statsRecorderWrapper; + private ArgumentCaptor longValue; + private ArgumentCaptor intValue; + private ArgumentCaptor status; + private ArgumentCaptor tableId; + private ArgumentCaptor zone; + private ArgumentCaptor cluster; + + private Stopwatch serverRetryDelayStopwatch; + private AtomicLong serverTotalRetryDelay; + + @Before + public void setUp() throws Exception { + mockService = new FakeService(); + + serverRetryDelayStopwatch = Stopwatch.createUnstarted(); + serverTotalRetryDelay = new AtomicLong(0); + + // Add an interceptor to send location information in the trailers and add server-timing in + // headers + ServerInterceptor trailersInterceptor = + new ServerInterceptor() { + private AtomicInteger count = new AtomicInteger(0); + + @Override + public ServerCall.Listener interceptCall( + ServerCall serverCall, + Metadata metadata, + ServerCallHandler serverCallHandler) { + return serverCallHandler.startCall( + new ForwardingServerCall.SimpleForwardingServerCall(serverCall) { + @Override + public void sendHeaders(Metadata headers) { + headers.put( + Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER), + String.format("gfet4t7; dur=%d", FAKE_SERVER_TIMING)); + super.sendHeaders(headers); + } + + @Override + public void close(Status status, Metadata trailers) { + super.close(status, trailers); + } + }, + metadata); + } + }; + + serviceHelper = new FakeServiceHelper(trailersInterceptor, mockService); + + statsRecorderWrapper = Mockito.mock(StatsRecorderWrapper.class); + + BigtableDataSettings settings = + BigtableDataSettings.newBuilderForEmulator(serviceHelper.getPort()) + .setProjectId(PROJECT_ID) + .setInstanceId(INSTANCE_ID) + .setAppProfileId(APP_PROFILE_ID) + .build(); + EnhancedBigtableStubSettings.Builder stubSettingsBuilder = + settings.getStubSettings().toBuilder(); + stubSettingsBuilder + .mutateRowSettings() + .retrySettings() + .setInitialRetryDelay(Duration.ofMillis(200)); + stubSettingsBuilder.setTracerFactory( + BuiltinMetricsTracerFactory.create( + StatsWrapper.get(), ImmutableMap.of(), statsRecorderWrapper)); + + EnhancedBigtableStubSettings stubSettings = stubSettingsBuilder.build(); + stub = new EnhancedBigtableStub(stubSettings, ClientContext.create(stubSettings)); + + serviceHelper.start(); + + longValue = ArgumentCaptor.forClass(long.class); + intValue = ArgumentCaptor.forClass(int.class); + status = ArgumentCaptor.forClass(String.class); + tableId = ArgumentCaptor.forClass(String.class); + zone = ArgumentCaptor.forClass(String.class); + cluster = ArgumentCaptor.forClass(String.class); + } + + @After + public void tearDown() { + stub.close(); + serviceHelper.shutdown(); + } + + @Test + public void testOperationLatencies() { + Stopwatch stopwatch = Stopwatch.createStarted(); + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)).iterator()); + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + verify(statsRecorderWrapper).putOperationLatencies(longValue.capture()); + + assertThat(longValue.getValue()).isIn(Range.closed(SERVER_LATENCY, elapsed)); + } + + @Test + public void testGfeMetrics() { + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID))); + + verify(statsRecorderWrapper).putGfeLatencies(longValue.capture()); + assertThat(longValue.getValue()).isEqualTo(FAKE_SERVER_TIMING); + + verify(statsRecorderWrapper).putGfeMissingHeaders(longValue.capture()); + assertThat(longValue.getValue()).isEqualTo(0); + } + + @Test + public void testReadRowsApplicationLatency() throws Exception { + final long applicationLatency = 1000; + final SettableApiFuture future = SettableApiFuture.create(); + final AtomicInteger counter = new AtomicInteger(0); + // We want to measure how long application waited before requesting another message after + // the previous message is returned from the server. Using ResponseObserver here so that the + // flow will be + // onResponse() -> sleep -> onRequest() (for the next message) which is exactly what we want to + // measure for + // application latency. + // If we do readRowsCallable().call(Query.create(TABLE_ID)).iterator() and iterate through the + // iterator and sleep in + // between responses, when the call started, the client will pre-fetch the first response, which + // won't be counted + // in application latency. So the test will be flaky and hard to debug. + stub.readRowsCallable() + .call( + Query.create(TABLE_ID), + new ResponseObserver() { + @Override + public void onStart(StreamController streamController) {} + + @Override + public void onResponse(Row row) { + try { + counter.incrementAndGet(); + Thread.sleep(applicationLatency); + } catch (InterruptedException e) { + } + } + + @Override + public void onError(Throwable throwable) { + future.setException(throwable); + } + + @Override + public void onComplete() { + future.set(null); + } + }); + future.get(); + + verify(statsRecorderWrapper).putApplicationLatencies(longValue.capture()); + + assertThat(counter.get()).isGreaterThan(0); + assertThat(longValue.getValue()).isAtLeast(applicationLatency * counter.get()); + assertThat(longValue.getValue()).isLessThan(applicationLatency * (counter.get() + 1)); + } + + @Test + public void testRetryCount() { + stub.mutateRowCallable() + .call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value")); + + verify(statsRecorderWrapper).putRetryCount(intValue.capture()); + + assertThat(intValue.getValue()).isEqualTo(3); + } + + @Test + public void testMutateRowAttempts() { + stub.mutateRowCallable() + .call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value")); + + verify(statsRecorderWrapper, times(4)) + .record(status.capture(), tableId.capture(), zone.capture(), cluster.capture()); + assertThat(zone.getAllValues().get(0)).isEqualTo(UNDEFINED); + assertThat(zone.getAllValues().get(1)).isEqualTo(UNDEFINED); + assertThat(zone.getAllValues().get(2)).isEqualTo(UNDEFINED); + assertThat(cluster.getAllValues().get(0)).isEqualTo(UNDEFINED); + assertThat(cluster.getAllValues().get(1)).isEqualTo(UNDEFINED); + assertThat(cluster.getAllValues().get(2)).isEqualTo(UNDEFINED); + assertThat(status.getAllValues().get(0)).isEqualTo("UNAVAILABLE"); + assertThat(status.getAllValues().get(1)).isEqualTo("UNAVAILABLE"); + assertThat(status.getAllValues().get(2)).isEqualTo("OK"); + } + + private class FakeService extends BigtableGrpc.BigtableImplBase { + + @Override + public void readRows( + ReadRowsRequest request, StreamObserver responseObserver) { + try { + Thread.sleep(SERVER_LATENCY); + } catch (InterruptedException e) { + } + responseObserver.onNext(READ_ROWS_RESPONSE_1); + responseObserver.onNext(READ_ROWS_RESPONSE_2); + responseObserver.onNext(READ_ROWS_RESPONSE_3); + responseObserver.onCompleted(); + } + + @Override + public void mutateRow( + MutateRowRequest request, StreamObserver responseObserver) { + if (serverRetryDelayStopwatch.isRunning()) { + serverTotalRetryDelay.addAndGet(serverRetryDelayStopwatch.elapsed(TimeUnit.MILLISECONDS)); + serverRetryDelayStopwatch.reset(); + } + if (rpcCount.get() < 2) { + responseObserver.onError(new StatusRuntimeException(Status.UNAVAILABLE)); + rpcCount.getAndIncrement(); + serverRetryDelayStopwatch.start(); + return; + } + responseObserver.onNext(MutateRowResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java index 69a741d0e3..0de14636c6 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java @@ -23,6 +23,7 @@ import com.google.api.gax.tracing.ApiTracer; import com.google.api.gax.tracing.ApiTracer.Scope; +import com.google.bigtable.v2.ReadRowsRequest; import com.google.cloud.bigtable.misc_utilities.MethodComparator; import com.google.common.collect.ImmutableList; import io.grpc.Status; @@ -118,11 +119,12 @@ public void testConnectionSelected() { @Test public void testAttemptStarted() { - compositeTracer.attemptStarted(3); - verify(child1, times(1)).attemptStarted(3); - verify(child2, times(1)).attemptStarted(3); - verify(child3, times(1)).attemptStarted(3); - verify(child4, times(1)).attemptStarted(3); + ReadRowsRequest request = ReadRowsRequest.getDefaultInstance(); + compositeTracer.attemptStarted(request, 3); + verify(child1, times(1)).attemptStarted(request, 3); + verify(child2, times(1)).attemptStarted(request, 3); + verify(child3, times(1)).attemptStarted(request, 3); + verify(child4, times(1)).attemptStarted(request, 3); } @Test diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java index efef3b67d2..3c0fb4e617 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java @@ -30,13 +30,13 @@ public class UtilTest { @Test public void testOk() { - TagValue tagValue = Util.extractStatus((Throwable) null); + TagValue tagValue = TagValue.create(Util.extractStatus((Throwable) null)); assertThat(tagValue.asString()).isEqualTo("OK"); } @Test public void testOkFuture() { - TagValue tagValue = Util.extractStatus(Futures.immediateFuture(null)); + TagValue tagValue = Util.extractStatusFromFuture(Futures.immediateFuture(null)); assertThat(tagValue.asString()).isEqualTo("OK"); } @@ -45,7 +45,7 @@ public void testError() { DeadlineExceededException error = new DeadlineExceededException( "Deadline exceeded", null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true); - TagValue tagValue = Util.extractStatus(error); + TagValue tagValue = TagValue.create(Util.extractStatus(error)); assertThat(tagValue.asString()).isEqualTo("DEADLINE_EXCEEDED"); } @@ -54,13 +54,13 @@ public void testErrorFuture() { DeadlineExceededException error = new DeadlineExceededException( "Deadline exceeded", null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true); - TagValue tagValue = Util.extractStatus(Futures.immediateFailedFuture(error)); + TagValue tagValue = Util.extractStatusFromFuture(Futures.immediateFailedFuture(error)); assertThat(tagValue.asString()).isEqualTo("DEADLINE_EXCEEDED"); } @Test public void testCancelledFuture() { - TagValue tagValue = Util.extractStatus(Futures.immediateCancelledFuture()); + TagValue tagValue = Util.extractStatusFromFuture(Futures.immediateCancelledFuture()); assertThat(tagValue.asString()).isEqualTo("CANCELLED"); } } diff --git a/pom.xml b/pom.xml index dbfddb8ee7..7bc6bccfc6 100644 --- a/pom.xml +++ b/pom.xml @@ -212,6 +212,15 @@ + + + + + + + + +