Skip to content

Commit

Permalink
make stopwatch thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Jun 9, 2022
1 parent 82e63f0 commit c7290fa
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@ class BuiltinMetricsTracer extends BigtableTracer {
private volatile int attempt = 0;

// Total application latency
private final Stopwatch applicationLatencyTimer = Stopwatch.createUnstarted();
// Total application latency needs to be atomic because it's accessed from different threads. E.g.
// request() from
// user thread and attempt failed from grpc thread.
private final AtomicLong totalApplicationLatency = new AtomicLong(0);
// Stopwatch is not thread safe so this is a workaround to check if the stopwatch changes is
// flushed to memory.
private final AtomicBoolean applicationLatencyTimerIsRunning = new AtomicBoolean();
private final Stopwatch applicationLatencyTimer = Stopwatch.createUnstarted();

// Monitored resource labels
private String tableId = "undefined";
Expand Down Expand Up @@ -113,7 +119,7 @@ public void attemptStarted(Object request, int attemptNumber) {
if (request != null) {
this.tableId = Util.extractTableId(request);
}
if (applicationLatencyTimer.isRunning()) {
if (applicationLatencyTimerIsRunning.compareAndSet(true, false)) {
totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
applicationLatencyTimer.reset();
}
Expand All @@ -131,7 +137,7 @@ public void attemptCancelled() {

@Override
public void attemptFailed(Throwable error, Duration delay) {
if (!applicationLatencyTimer.isRunning()) {
if (applicationLatencyTimerIsRunning.compareAndSet(false, true)) {
applicationLatencyTimer.start();
}
recordAttemptCompletion(error);
Expand Down Expand Up @@ -159,15 +165,15 @@ public void lroStartSucceeded() {

@Override
public void onRequest() {
if (applicationLatencyTimer.isRunning()) {
if (applicationLatencyTimerIsRunning.compareAndSet(true, false)) {
totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
applicationLatencyTimer.reset();
}
}

@Override
public void responseReceived() {
if (!applicationLatencyTimer.isRunning()) {
if (applicationLatencyTimerIsRunning.compareAndSet(false, true)) {
applicationLatencyTimer.start();
}
if (firstResponsePerOpTimer.isRunning()) {
Expand Down Expand Up @@ -221,7 +227,7 @@ private void recordOperationCompletion(@Nullable Throwable status) {

recorder.putRetryCount(attemptCount);

if (applicationLatencyTimer.isRunning()) {
if (applicationLatencyTimerIsRunning.compareAndSet(true, false)) {
applicationLatencyTimer.stop();
totalApplicationLatency.addAndGet(applicationLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ private class HeaderTracerResponseObserver<ResponseT> implements ResponseObserve

@Override
public void onStart(final StreamController controller) {
outerObserver.onStart(controller);
final StreamController tracedController = new TracedStreamController(controller, tracer);
outerObserver.onStart(tracedController);
}

@Override
Expand All @@ -108,4 +109,30 @@ public void onComplete() {
outerObserver.onComplete();
}
}

private class TracedStreamController implements StreamController {
private final StreamController innerController;
private final BigtableTracer tracer;

TracedStreamController(StreamController innerController, BigtableTracer tracer) {
this.innerController = innerController;
this.tracer = tracer;
}

@Override
public void cancel() {
innerController.cancel();
}

@Override
public void disableAutoInboundFlowControl() {
innerController.disableAutoInboundFlowControl();
}

@Override
public void request(int i) {
tracer.onRequest();
innerController.request(i);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -128,16 +127,10 @@ public class BuiltinMetricsTracerTest {
@Captor private ArgumentCaptor<String> zone;
@Captor private ArgumentCaptor<String> cluster;

private Stopwatch serverRetryDelayStopwatch;
private AtomicLong serverTotalRetryDelay;

@Before
public void setUp() throws Exception {
mockService = new FakeService();

serverRetryDelayStopwatch = Stopwatch.createUnstarted();
serverTotalRetryDelay = new AtomicLong(0);

// Add an interceptor to add server-timing in headers
ServerInterceptor trailersInterceptor =
new ServerInterceptor() {
Expand Down Expand Up @@ -305,14 +298,9 @@ public void readRows(
@Override
public void mutateRow(
MutateRowRequest request, StreamObserver<MutateRowResponse> responseObserver) {
if (serverRetryDelayStopwatch.isRunning()) {
serverTotalRetryDelay.addAndGet(serverRetryDelayStopwatch.elapsed(TimeUnit.MILLISECONDS));
serverRetryDelayStopwatch.reset();
}
if (rpcCount.get() < 2) {
responseObserver.onError(new StatusRuntimeException(Status.UNAVAILABLE));
rpcCount.getAndIncrement();
serverRetryDelayStopwatch.start();
return;
}
responseObserver.onNext(MutateRowResponse.getDefaultInstance());
Expand Down

0 comments on commit c7290fa

Please sign in to comment.