Skip to content

Commit

Permalink
update comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Jun 21, 2022
1 parent 35bb7fa commit 67a79da
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -154,7 +153,6 @@ public static EnhancedBigtableStubSettings finalizeSettings(
EnhancedBigtableStubSettings settings, Tagger tagger, StatsRecorder stats)
throws IOException {
EnhancedBigtableStubSettings.Builder builder = settings.toBuilder();
StatsWrapper statsWrapper = StatsWrapper.create();

// TODO: this implementation is on the cusp of unwieldy, if we end up adding more features
// consider splitting it up by feature.
Expand Down Expand Up @@ -227,7 +225,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build()),
// Add OpenCensus Metrics
MetricsTracerFactory.create(tagger, stats, attributes),
BuiltinMetricsTracerFactory.create(statsWrapper, builtinAttributes),
BuiltinMetricsTracerFactory.create(builtinAttributes),
// Add user configured tracer
settings.getTracerFactory())));
return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.gax.tracing.ApiTracerFactory;
import static com.google.api.gax.tracing.ApiTracerFactory.OperationType;

import com.google.api.gax.tracing.SpanName;
import com.google.cloud.bigtable.stats.StatsRecorderWrapper;
import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -36,7 +36,7 @@ class BuiltinMetricsTracer extends BigtableTracer {

private final StatsRecorderWrapper recorder;

private final ApiTracerFactory.OperationType operationType;
private final OperationType operationType;
private final SpanName spanName;

// Operation level metrics
Expand All @@ -51,8 +51,7 @@ class BuiltinMetricsTracer extends BigtableTracer {

// Total application latency
// Total application latency needs to be atomic because it's accessed from different threads. E.g.
// request() from
// user thread and attempt failed from grpc thread.
// request() from user thread and attempt failed from grpc thread.
private final AtomicLong totalApplicationLatency = new AtomicLong(0);
// Stopwatch is not thread safe so this is a workaround to check if the stopwatch changes is
// flushed to memory.
Expand All @@ -67,20 +66,12 @@ class BuiltinMetricsTracer extends BigtableTracer {
// gfe stats
private AtomicLong gfeMissingHeaders = new AtomicLong(0);

@VisibleForTesting
BuiltinMetricsTracer(
ApiTracerFactory.OperationType operationType,
SpanName spanName,
Map<String, String> attributes,
StatsWrapper statsWrapper,
@Nullable StatsRecorderWrapper statsRecorderWrapper) {
OperationType operationType, SpanName spanName, StatsRecorderWrapper recorder) {
this.operationType = operationType;
this.spanName = spanName;
if (statsRecorderWrapper != null) {
// A workaround for test to pass in a mock StatsRecorderWrapper
this.recorder = statsRecorderWrapper;
} else {
this.recorder = new StatsRecorderWrapper(operationType, spanName, attributes, statsWrapper);
}
this.recorder = recorder;
}

@Override
Expand Down Expand Up @@ -143,26 +134,6 @@ public void attemptFailed(Throwable error, Duration delay) {
recordAttemptCompletion(error);
}

@Override
public void attemptFailedRetriesExhausted(Throwable error) {
super.attemptFailedRetriesExhausted(error);
}

@Override
public void attemptPermanentFailure(Throwable error) {
super.attemptPermanentFailure(error);
}

@Override
public void lroStartFailed(Throwable error) {
super.lroStartFailed(error);
}

@Override
public void lroStartSucceeded() {
super.lroStartSucceeded();
}

@Override
public void onRequest() {
if (applicationLatencyTimerIsRunning.compareAndSet(true, false)) {
Expand All @@ -181,16 +152,6 @@ public void responseReceived() {
}
}

@Override
public void requestSent() {
super.requestSent();
}

@Override
public void batchRequestSent(long elementCount, long requestSize) {
super.batchRequestSent(elementCount, requestSize);
}

@Override
public int getAttempt() {
return attempt;
Expand Down Expand Up @@ -235,7 +196,7 @@ private void recordOperationCompletion(@Nullable Throwable status) {

recorder.putOperationLatencies(operationTimer.elapsed(TimeUnit.MILLISECONDS));

if (operationType == ApiTracerFactory.OperationType.ServerStreaming
if (operationType == OperationType.ServerStreaming
&& spanName.getMethodName().equals("ReadRows")) {
recorder.putFirstResponseLatencies(firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.api.gax.tracing.BaseApiTracerFactory;
import com.google.api.gax.tracing.SpanName;
import com.google.cloud.bigtable.stats.StatsRecorderWrapper;
import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;

/**
Expand All @@ -33,35 +31,20 @@
public class BuiltinMetricsTracerFactory extends BaseApiTracerFactory {

private final ImmutableMap<String, String> statsAttributes;
private final StatsWrapper statsWrapper;
private final StatsRecorderWrapper statsRecorderWrapper;

public static BuiltinMetricsTracerFactory create(
StatsWrapper statsWrapper, ImmutableMap<String, String> statsAttributes) {
return new BuiltinMetricsTracerFactory(statsWrapper, statsAttributes, null);
public static BuiltinMetricsTracerFactory create(ImmutableMap<String, String> statsAttributes) {
return new BuiltinMetricsTracerFactory(statsAttributes);
}

// A workaround for test to pass in a mock StatsRecorderWrapper
@VisibleForTesting
static BuiltinMetricsTracerFactory createWithRecorder(
StatsWrapper statsWrapper,
ImmutableMap<String, String> statsAttributes,
StatsRecorderWrapper statsRecorderWrapper) {
return new BuiltinMetricsTracerFactory(statsWrapper, statsAttributes, statsRecorderWrapper);
}

private BuiltinMetricsTracerFactory(
StatsWrapper statsWrapper,
ImmutableMap<String, String> statsAttributes,
StatsRecorderWrapper statsRecorderWrapper) {
private BuiltinMetricsTracerFactory(ImmutableMap<String, String> statsAttributes) {
this.statsAttributes = statsAttributes;
this.statsWrapper = statsWrapper;
this.statsRecorderWrapper = statsRecorderWrapper;
}

@Override
public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) {
return new BuiltinMetricsTracer(
operationType, spanName, statsAttributes, statsWrapper, statsRecorderWrapper);
operationType,
spanName,
StatsWrapper.createRecorder(operationType, spanName, statsAttributes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private class HeaderTracerResponseObserver<ResponseT> implements ResponseObserve

@Override
public void onStart(final StreamController controller) {
final StreamController tracedController = new TracedStreamController(controller, tracer);
StreamController tracedController = new TracedStreamController(controller, tracer);
outerObserver.onStart(tracedController);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,6 @@ private void recordOperationCompletion(@Nullable Throwable throwable) {
measures.record(tagCtx.build());
}

@Override
public void connectionSelected(String s) {
// noop: cardinality for connection ids is too high to use as tags
}

@Override
public void attemptStarted(int attemptNumber) {
attempt = attemptNumber;
Expand Down Expand Up @@ -180,16 +175,6 @@ private void recordAttemptCompletion(@Nullable Throwable throwable) {
measures.record(tagCtx.build());
}

@Override
public void lroStartFailed(Throwable throwable) {
// noop
}

@Override
public void lroStartSucceeded() {
// noop
}

@Override
public void responseReceived() {
if (firstResponsePerOpTimer.isRunning()) {
Expand All @@ -199,16 +184,6 @@ public void responseReceived() {
operationResponseCount++;
}

@Override
public void requestSent() {
// noop: no operations are client streaming
}

@Override
public void batchRequestSent(long elementCount, long requestSize) {
// noop
}

@Override
public int getAttempt() {
return attempt;
Expand Down Expand Up @@ -253,14 +228,4 @@ private TagContextBuilder newTagCtxBuilder() {

return tagCtx;
}

@Override
public void setLocations(String zone, String cluster) {
// noop
}

@Override
public void onRequest() {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import static com.google.api.gax.tracing.ApiTracerFactory.OperationType;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.client.util.Lists;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.tracing.SpanName;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
Expand All @@ -37,9 +41,7 @@
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.cloud.bigtable.stats.StatsRecorderWrapper;
import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
Expand All @@ -59,13 +61,16 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
public class BuiltinMetricsTracerTest {
private static final String PROJECT_ID = "fake-project";
private static final String INSTANCE_ID = "fake-instance";
Expand Down Expand Up @@ -118,6 +123,7 @@ public class BuiltinMetricsTracerTest {

private EnhancedBigtableStub stub;

@Mock private BuiltinMetricsTracerFactory mockFactory;
@Mock private StatsRecorderWrapper statsRecorderWrapper;

@Captor private ArgumentCaptor<Long> longValue;
Expand Down Expand Up @@ -169,9 +175,7 @@ public void sendHeaders(Metadata headers) {
.mutateRowSettings()
.retrySettings()
.setInitialRetryDelay(Duration.ofMillis(200));
stubSettingsBuilder.setTracerFactory(
BuiltinMetricsTracerFactory.createWithRecorder(
StatsWrapper.create(), ImmutableMap.of(), statsRecorderWrapper));
stubSettingsBuilder.setTracerFactory(mockFactory);

EnhancedBigtableStubSettings stubSettings = stubSettingsBuilder.build();
stub = new EnhancedBigtableStub(stubSettings, ClientContext.create(stubSettings));
Expand All @@ -185,6 +189,12 @@ public void tearDown() {

@Test
public void testOperationLatencies() {
when(mockFactory.newTracer(any(), any(), any()))
.thenReturn(
new BuiltinMetricsTracer(
OperationType.ServerStreaming,
SpanName.of("Bigtable", "ReadRows"),
statsRecorderWrapper));
Stopwatch stopwatch = Stopwatch.createStarted();
Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)).iterator());
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
Expand All @@ -196,6 +206,13 @@ public void testOperationLatencies() {

@Test
public void testGfeMetrics() {
when(mockFactory.newTracer(any(), any(), any()))
.thenReturn(
new BuiltinMetricsTracer(
OperationType.ServerStreaming,
SpanName.of("Bigtable", "ReadRows"),
statsRecorderWrapper));

Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)));

verify(statsRecorderWrapper).putGfeLatencies(longValue.capture());
Expand All @@ -207,6 +224,13 @@ public void testGfeMetrics() {

@Test
public void testReadRowsApplicationLatency() throws Exception {
when(mockFactory.newTracer(any(), any(), any()))
.thenReturn(
new BuiltinMetricsTracer(
OperationType.ServerStreaming,
SpanName.of("Bigtable", "ReadRows"),
statsRecorderWrapper));

final long applicationLatency = 1000;
final SettableApiFuture future = SettableApiFuture.create();
final AtomicInteger counter = new AtomicInteger(0);
Expand Down Expand Up @@ -258,6 +282,11 @@ public void onComplete() {

@Test
public void testRetryCount() {
when(mockFactory.newTracer(any(), any(), any()))
.thenReturn(
new BuiltinMetricsTracer(
OperationType.Unary, SpanName.of("Bigtable", "MutateRow"), statsRecorderWrapper));

stub.mutateRowCallable()
.call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value"));

Expand All @@ -268,6 +297,11 @@ public void testRetryCount() {

@Test
public void testMutateRowAttempts() {
when(mockFactory.newTracer(any(), any(), any()))
.thenReturn(
new BuiltinMetricsTracer(
OperationType.Unary, SpanName.of("Bigtable", "MutateRow"), statsRecorderWrapper));

stub.mutateRowCallable()
.call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value"));

Expand Down
Loading

0 comments on commit 67a79da

Please sign in to comment.