diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index 48fc7dcce8..cf4c850e62 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -50,8 +50,14 @@ class BuiltinMetricsTracer extends BigtableTracer { private volatile int attempt = 0; // Total application latency - private final Stopwatch applicationLatencyTimer = Stopwatch.createUnstarted(); + // Total application latency needs to be atomic because it's accessed from different threads. E.g. + // request() from + // user thread and attempt failed from grpc thread. private final AtomicLong totalApplicationLatency = new AtomicLong(0); + // Stopwatch is not thread safe so this is a workaround to check if the stopwatch changes is + // flushed to memory. + private final AtomicBoolean applicationLatencyTimerIsRunning = new AtomicBoolean(); + private final Stopwatch applicationLatencyTimer = Stopwatch.createUnstarted(); // Monitored resource labels private String tableId = "undefined"; @@ -113,7 +119,7 @@ public void attemptStarted(Object request, int attemptNumber) { if (request != null) { this.tableId = Util.extractTableId(request); } - if (applicationLatencyTimer.isRunning()) { + if (applicationLatencyTimerIsRunning.compareAndSet(true, false)) { totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS)); applicationLatencyTimer.reset(); } @@ -131,7 +137,7 @@ public void attemptCancelled() { @Override public void attemptFailed(Throwable error, Duration delay) { - if (!applicationLatencyTimer.isRunning()) { + if (applicationLatencyTimerIsRunning.compareAndSet(false, true)) { applicationLatencyTimer.start(); } recordAttemptCompletion(error); @@ -159,7 +165,7 @@ public void lroStartSucceeded() { @Override public void onRequest() { - if (applicationLatencyTimer.isRunning()) { + if (applicationLatencyTimerIsRunning.compareAndSet(true, false)) { totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS)); applicationLatencyTimer.reset(); } @@ -167,7 +173,7 @@ public void onRequest() { @Override public void responseReceived() { - if (!applicationLatencyTimer.isRunning()) { + if (applicationLatencyTimerIsRunning.compareAndSet(false, true)) { applicationLatencyTimer.start(); } if (firstResponsePerOpTimer.isRunning()) { @@ -221,7 +227,7 @@ private void recordOperationCompletion(@Nullable Throwable status) { recorder.putRetryCount(attemptCount); - if (applicationLatencyTimer.isRunning()) { + if (applicationLatencyTimerIsRunning.compareAndSet(true, false)) { applicationLatencyTimer.stop(); totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS)); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java index 31c5cf1960..8f5c9871b4 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java @@ -82,7 +82,8 @@ private class HeaderTracerResponseObserver implements ResponseObserve @Override public void onStart(final StreamController controller) { - outerObserver.onStart(controller); + final StreamController tracedController = new TracedStreamController(controller, tracer); + outerObserver.onStart(tracedController); } @Override @@ -108,4 +109,30 @@ public void onComplete() { outerObserver.onComplete(); } } + + private class TracedStreamController implements StreamController { + private final StreamController innerController; + private final BigtableTracer tracer; + + TracedStreamController(StreamController innerController, BigtableTracer tracer) { + this.innerController = innerController; + this.tracer = tracer; + } + + @Override + public void cancel() { + innerController.cancel(); + } + + @Override + public void disableAutoInboundFlowControl() { + innerController.disableAutoInboundFlowControl(); + } + + @Override + public void request(int i) { + tracer.onRequest(); + innerController.request(i); + } + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java index 21d4c626f9..5e62c44748 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java @@ -55,7 +55,6 @@ import io.grpc.stub.StreamObserver; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -128,16 +127,10 @@ public class BuiltinMetricsTracerTest { @Captor private ArgumentCaptor zone; @Captor private ArgumentCaptor cluster; - private Stopwatch serverRetryDelayStopwatch; - private AtomicLong serverTotalRetryDelay; - @Before public void setUp() throws Exception { mockService = new FakeService(); - serverRetryDelayStopwatch = Stopwatch.createUnstarted(); - serverTotalRetryDelay = new AtomicLong(0); - // Add an interceptor to add server-timing in headers ServerInterceptor trailersInterceptor = new ServerInterceptor() { @@ -305,14 +298,9 @@ public void readRows( @Override public void mutateRow( MutateRowRequest request, StreamObserver responseObserver) { - if (serverRetryDelayStopwatch.isRunning()) { - serverTotalRetryDelay.addAndGet(serverRetryDelayStopwatch.elapsed(TimeUnit.MILLISECONDS)); - serverRetryDelayStopwatch.reset(); - } if (rpcCount.get() < 2) { responseObserver.onError(new StatusRuntimeException(Status.UNAVAILABLE)); rpcCount.getAndIncrement(); - serverRetryDelayStopwatch.start(); return; } responseObserver.onNext(MutateRowResponse.getDefaultInstance());