Skip to content

Commit

Permalink
feat: update tracers to use built in metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed May 11, 2022
1 parent 3655af2 commit ca28be6
Show file tree
Hide file tree
Showing 16 changed files with 964 additions and 100 deletions.
10 changes: 10 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,14 @@
<className>com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable</className>
<method>*</method>
</difference>
<!-- InternalApi that was removed -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable</className>
</difference>
<!-- InternalApi that was removed -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable</className>
</difference>
</differences>
9 changes: 9 additions & 0 deletions google-cloud-bigtable/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,22 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable-stats</artifactId>
<version>2.6.3-SNAPSHOT</version><!-- {x-version-update:google-cloud-bigtable:current} -->
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- NOTE: Dependencies are organized into two groups, production and test.
Within a group, dependencies are sorted by (groupId, artifactId) -->

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable-stats</artifactId>
</dependency>
<!-- Production dependencies -->
<dependency>
<groupId>com.google.api</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable;
Expand All @@ -89,6 +90,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -151,6 +153,16 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
public static EnhancedBigtableStubSettings finalizeSettings(
EnhancedBigtableStubSettings settings, Tagger tagger, StatsRecorder stats)
throws IOException {
StatsWrapper builtinMetricsWrapper = new StatsWrapper(false);
return finalizeSettings(settings, tagger, stats, builtinMetricsWrapper);
}

private static EnhancedBigtableStubSettings finalizeSettings(
EnhancedBigtableStubSettings settings,
Tagger tagger,
StatsRecorder stats,
StatsWrapper builtinMetricsWrapper)
throws IOException {
EnhancedBigtableStubSettings.Builder builder = settings.toBuilder();

// TODO: this implementation is on the cusp of unwieldy, if we end up adding more features
Expand Down Expand Up @@ -194,6 +206,12 @@ public static EnhancedBigtableStubSettings finalizeSettings(
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
TagValue.create(settings.getAppProfileId()))
.build();
ImmutableMap<String, String> builtinAttributes =
ImmutableMap.<String, String>builder()
.put("project_id", settings.getProjectId())
.put("instance_id", settings.getInstanceId())
.put("app_profile", settings.getAppProfileId())
.build();
// Inject Opencensus instrumentation
builder.setTracerFactory(
new CompositeTracerFactory(
Expand All @@ -218,6 +236,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build()),
// Add OpenCensus Metrics
MetricsTracerFactory.create(tagger, stats, attributes),
BuiltinMetricsTracerFactory.create(builtinMetricsWrapper, builtinAttributes),
// Add user configured tracer
settings.getTracerFactory())));
return builder.build();
Expand Down Expand Up @@ -369,7 +388,7 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured by the {@code rowAdapter} parameter.
* <li>Add header tracer for tracking GFE metrics.
* <li>Add BigtableTracer callable for tracking Bigtable specific metrics
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* </ul>
Expand Down Expand Up @@ -420,13 +439,13 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
ServerStreamingCallable<ReadRowsRequest, RowT> watched =
Callables.watched(merging, innerSettings, clientContext);

ServerStreamingCallable<ReadRowsRequest, RowT> withHeaderTracer =
new HeaderTracerStreamingCallable<>(watched);
ServerStreamingCallable<ReadRowsRequest, RowT> withBigtableTracer =
new BigtableTracerStreamingCallable<>(watched);

// Retry logic is split into 2 parts to workaround a rare edge case described in
// ReadRowsRetryCompletedCallable
ServerStreamingCallable<ReadRowsRequest, RowT> retrying1 =
new ReadRowsRetryCompletedCallable<>(withHeaderTracer);
new ReadRowsRetryCompletedCallable<>(withBigtableTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
Expand Down Expand Up @@ -465,11 +484,11 @@ private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
UnaryCallable<Query, List<RowT>> tracedBatcher =
new TracedBatcherUnaryCallable<>(readRowsUserCallable.all());

UnaryCallable<Query, List<RowT>> withHeaderTracer =
new HeaderTracerUnaryCallable(tracedBatcher);
UnaryCallable<Query, List<RowT>> withBigtableTracer =
new BigtableTracerUnaryCallable<>(tracedBatcher);

UnaryCallable<Query, List<RowT>> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), span);
new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -511,11 +530,11 @@ public Map<String, String> extract(
UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withStatsHeaders =
new StatsHeadersUnaryCallable<>(spoolable);

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withHeaderTracer =
new HeaderTracerUnaryCallable<>(withStatsHeaders);
UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(withHeaderTracer, settings.sampleRowKeysSettings(), clientContext);
Callables.retrying(withBigtableTracer, settings.sampleRowKeysSettings(), clientContext);

return createUserFacingUnaryCallable(
methodName, new SampleRowKeysCallable(retryable, requestContext));
Expand Down Expand Up @@ -550,11 +569,11 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
UnaryCallable<MutateRowRequest, MutateRowResponse> withStatsHeaders =
new StatsHeadersUnaryCallable<>(base);

UnaryCallable<MutateRowRequest, MutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(withStatsHeaders);
UnaryCallable<MutateRowRequest, MutateRowResponse> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(withHeaderTracer, settings.mutateRowSettings(), clientContext);
Callables.retrying(withBigtableTracer, settings.mutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
methodName, new MutateRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -594,13 +613,13 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {

SpanName spanName = getSpanName("MutateRows");

UnaryCallable<BulkMutation, Void> tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing);

UnaryCallable<BulkMutation, Void> withHeaderTracer =
new HeaderTracerUnaryCallable<>(tracedBatcher);
UnaryCallable<BulkMutation, Void> tracedBatcherUnaryCallable =
new TracedBatcherUnaryCallable<>(userFacing);

UnaryCallable<BulkMutation, Void> withBigtableTracer =
new BigtableTracerUnaryCallable<>(tracedBatcherUnaryCallable);
UnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), spanName);
new TracedUnaryCallable<>(withBigtableTracer, clientContext.getTracerFactory(), spanName);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -738,11 +757,11 @@ public Map<String, String> extract(
UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withStatsHeaders =
new StatsHeadersUnaryCallable<>(base);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(withStatsHeaders);
UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(withHeaderTracer, settings.checkAndMutateRowSettings(), clientContext);
Callables.retrying(withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
methodName, new CheckAndMutateRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -779,11 +798,12 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
new StatsHeadersUnaryCallable<>(base);

String methodName = "ReadModifyWriteRow";
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(withStatsHeaders);
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(withHeaderTracer, settings.readModifyWriteRowSettings(), clientContext);
Callables.retrying(
withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext);

return createUserFacingUnaryCallable(
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base
* implementation that does nothing.
*/
@BetaApi("This surface is stable yet it might be removed in the future.")
@BetaApi("This surface is not stable yet it might be removed in the future.")
public class BigtableTracer extends BaseApiTracer {

private volatile int attempt = 0;
Expand All @@ -35,6 +35,11 @@ public void attemptStarted(int attemptNumber) {
this.attempt = attemptNumber;
}

/** annotate when onRequest is called */
public void onRequest() {
// noop
}

/**
* Get the attempt number of the current call. Attempt number for the current call is passed in
* and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from
Expand All @@ -57,4 +62,9 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
public void batchRequestThrottled(long throttledTimeMs) {
// noop
}

/** Set the Bigtable zone and cluster so metrics can be tagged with location information. */
public void setLocations(String zone, String cluster) {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,28 @@
import javax.annotation.Nonnull;

/**
* This callable will inject a {@link GrpcResponseMetadata} to access the headers and trailers
* returned by gRPC methods upon completion. The {@link BigtableTracer} will process metrics that
* were injected in the header/trailer and publish them to OpenCensus. If {@link
* GrpcResponseMetadata#getMetadata()} returned null, it probably means that the request has never
* reached GFE, and it'll increment the gfe_header_missing_counter in this case.
* This callable will:
*
* <p>If GFE metrics are not registered in {@link RpcViews}, skip injecting GrpcResponseMetadata.
* This is for the case where direct path is enabled, all the requests won't go through GFE and
* therefore won't have the server-timing header.
* <p>- Inject a {@link GrpcResponseMetadata} to access the headers and trailers returned by gRPC
* methods upon completion. The {@link BigtableTracer} will process metrics that were injected in
* the header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
* returned null, it probably means that the request has never reached GFE, and it'll increment the
* gfe_header_missing_counter in this case.
*
* <p>- Call {@link BigtableTracer#onRequest()} to record the request events in a stream.
*
* <p>- Get Bigtable zone and cluster information from response trailer and record in tracer.
*
* <p>This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
public class HeaderTracerStreamingCallable<RequestT, ResponseT>
public class BigtableTracerStreamingCallable<RequestT, ResponseT>
extends ServerStreamingCallable<RequestT, ResponseT> {

private final ServerStreamingCallable<RequestT, ResponseT> innerCallable;

public HeaderTracerStreamingCallable(
public BigtableTracerStreamingCallable(
@Nonnull ServerStreamingCallable<RequestT, ResponseT> callable) {
this.innerCallable = Preconditions.checkNotNull(callable, "Inner callable must be set");
}
Expand All @@ -55,23 +57,23 @@ public void call(
RequestT request, ResponseObserver<ResponseT> responseObserver, ApiCallContext context) {
final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
// tracer should always be an instance of bigtable tracer
if (RpcViews.isGfeMetricsRegistered() && context.getTracer() instanceof BigtableTracer) {
HeaderTracerResponseObserver<ResponseT> innerObserver =
new HeaderTracerResponseObserver<>(
if (context.getTracer() instanceof BigtableTracer) {
BigtableTracerResponseObserver<ResponseT> innerObserver =
new BigtableTracerResponseObserver<>(
responseObserver, (BigtableTracer) context.getTracer(), responseMetadata);
innerCallable.call(request, innerObserver, responseMetadata.addHandlers(context));
} else {
innerCallable.call(request, responseObserver, context);
}
}

private class HeaderTracerResponseObserver<ResponseT> implements ResponseObserver<ResponseT> {
private class BigtableTracerResponseObserver<ResponseT> implements ResponseObserver<ResponseT> {

private final BigtableTracer tracer;
private final ResponseObserver<ResponseT> outerObserver;
private final GrpcResponseMetadata responseMetadata;

HeaderTracerResponseObserver(
BigtableTracerResponseObserver(
ResponseObserver<ResponseT> observer,
BigtableTracer tracer,
GrpcResponseMetadata metadata) {
Expand All @@ -82,7 +84,8 @@ private class HeaderTracerResponseObserver<ResponseT> implements ResponseObserve

@Override
public void onStart(final StreamController controller) {
outerObserver.onStart(controller);
final StreamController tracedController = new TracedStreamController(controller, tracer);
outerObserver.onStart(tracedController);
}

@Override
Expand All @@ -97,6 +100,16 @@ public void onError(Throwable t) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, t);
// try {
// byte[] trailers =
// responseMetadata
// .getTrailingMetadata()
// .get(Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER));
// ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
// tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
// } catch (NullPointerException | InvalidProtocolBufferException e) {
// }

outerObserver.onError(t);
}

Expand All @@ -105,7 +118,42 @@ public void onComplete() {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
// try {
// byte[] trailers =
// responseMetadata.getTrailingMetadata().get(Metadata.Key.of(Util.TRAILER_KEY,
// Metadata.BINARY_BYTE_MARSHALLER));
// ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
// tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
// } catch (NullPointerException | InvalidProtocolBufferException e) {
// }

outerObserver.onComplete();
}
}

private class TracedStreamController implements StreamController {
private final StreamController innerController;
private final BigtableTracer tracer;

TracedStreamController(StreamController innerController, BigtableTracer tracer) {
this.innerController = innerController;
this.tracer = tracer;
}

@Override
public void cancel() {
innerController.cancel();
}

@Override
public void disableAutoInboundFlowControl() {
innerController.disableAutoInboundFlowControl();
}

@Override
public void request(int i) {
tracer.onRequest();
innerController.request(i);
}
}
}
Loading

0 comments on commit ca28be6

Please sign in to comment.