Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigtable: integrate OpenCensus tracing into the bigtable data client #4493

Merged
merged 12 commits into from
Feb 26, 2019
60 changes: 60 additions & 0 deletions google-cloud-clients/google-cloud-bigtable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,66 @@ try {
}
```

## Opencensus Tracing

Cloud Bigtable client supports [Opencensus Tracing](https://opencensus.io/tracing/),
which gives insight into the client internals and aids in debugging production issues.
By default, the functionality is disabled. To enable, you need to add a couple of
dependencies and configure an exporter. For example to enable tracing using
[Google Stackdriver](https://cloud.google.com/trace/docs/):

[//]: # (TODO: figure out how to keep opencensus version in sync with pom.xml)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to ensure that the opencensus versions in this readme stay up to date with google-cloud-clients/pom.xml properties. I thought about using version markers, but those seem to only be used for internal artifacts. It would be awesome if version.txt could be generalized to manage static external versions as well.


If you are using Maven, add this to your pom.xml file
```xml
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>io.opencensus</artifactId>
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
<version>0.18.0</version>
</dependency>
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-exporter-trace-stackdriver</artifactId>
<version>0.18.0</version>
</dependency>
```
If you are using Gradle, add this to your dependencies
```Groovy
compile 'io.opencensus:opencensus-impl:0.18.0'
compile 'io.opencensus:opencensus-exporter-trace-stackdriver:0.18.0'
```
If you are using SBT, add this to your dependencies
```Scala
libraryDependencies += "io.opencensus" % "opencensus-impl" % "0.18.0"
libraryDependencies += "io.opencensus" % "opencensus-exporter-trace-stackdriver" % "0.18.0"
```

Then at the start of your application configure the exporter:

```java
import io.opencensus.exporter.trace.stackdriver.StackdriverTraceConfiguration;
import io.opencensus.exporter.trace.stackdriver.StackdriverTraceExporter;

StackdriverTraceExporter.createAndRegister(
StackdriverTraceConfiguration.builder()
.setProjectId("YOUR-PROJECT_ID")
.build());
```

By default traces are [sampled](https://opencensus.io/tracing/sampling) at a rate of about 1/10,000.
You can configure a higher rate by updating the active tracing params:

```java
import io.opencensus.trace.Tracing;
import io.opencensus.trace.samplers.Samplers;

Tracing.getTraceConfig().updateActiveTraceParams(
Tracing.getTraceConfig().getActiveTraceParams().toBuilder()
.setSampler(Samplers.probabilitySampler(0.01))
.build()
);
```

## Troubleshooting

To get help, follow the instructions in the [shared Troubleshooting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.rpc.BatchingCallSettings;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedBatchingCallable;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.api.gax.tracing.TracedUnaryCallable;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.SampleRowKeysRequest;
Expand All @@ -50,6 +54,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.tracing.WrappedTracerFactory;
import java.io.IOException;
import java.util.List;
import org.threeten.bp.Duration;
Expand All @@ -68,6 +73,9 @@
*/
@InternalApi
public class EnhancedBigtableStub implements AutoCloseable {
private static final String TRACING_OUTER_CLIENT_NAME = "Bigtable";
private static final String TRACING_INNER_CLIENT_NAME = "BaseBigtable";

private final EnhancedBigtableStubSettings settings;
private final GrpcBigtableStub stub;
private final ClientContext clientContext;
Expand All @@ -92,7 +100,10 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
.setCredentialsProvider(settings.getCredentialsProvider())
.setHeaderProvider(settings.getHeaderProvider())
.setStreamWatchdogProvider(settings.getStreamWatchdogProvider())
.setStreamWatchdogCheckInterval(settings.getStreamWatchdogCheckInterval());
.setStreamWatchdogCheckInterval(settings.getStreamWatchdogCheckInterval())
// Force the base stub to use a different TracerFactory
.setTracerFactory(
new WrappedTracerFactory(settings.getTracerFactory(), TRACING_INNER_CLIENT_NAME));

// ReadRow retries are handled in the overlay: disable retries in the base layer (but make
// sure to preserve the exception callable settings).
Expand Down Expand Up @@ -140,6 +151,9 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
ClientContext clientContext = ClientContext.create(baseSettings);
GrpcBigtableStub stub = new GrpcBigtableStub(baseSettings, clientContext);

// Make sure to keep the original tracer factory for the outer client.
clientContext = clientContext.toBuilder().setTracerFactory(settings.getTracerFactory()).build();

return new EnhancedBigtableStub(settings, clientContext, stub);
}

Expand Down Expand Up @@ -247,15 +261,15 @@ private <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
FilterMarkerRowsCallable<RowT> filtering =
new FilterMarkerRowsCallable<>(retrying2, rowAdapter);

ServerStreamingCallable<ReadRowsRequest, RowT> withContext =
filtering.withDefaultCallContext(clientContext.getDefaultCallContext());
ReadRowsUserCallable<RowT> userFacing = new ReadRowsUserCallable<>(filtering, requestContext);

// NOTE: Ideally `withDefaultCallContext` should be the outer-most callable, however the
// ReadRowsUserCallable overrides the first() method. This override would be lost if
// ReadRowsUserCallable is wrapped by another callable. At some point in the future,
// gax-java should allow preserving these kind of overrides through callable chains, at which
// point this should be re-ordered.
return new ReadRowsUserCallable<>(withContext, requestContext);
TracedServerStreamingCallable<Query, RowT> traced =
new TracedServerStreamingCallable<>(
userFacing,
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, "ReadRows"));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand All @@ -279,7 +293,13 @@ private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
UnaryCallable<String, List<KeyOffset>> userFacing =
new SampleRowKeysCallable(retryable, requestContext);

return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
UnaryCallable<String, List<KeyOffset>> traced =
new TracedUnaryCallable<>(
userFacing,
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, "SampleRowKeys"));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand All @@ -292,7 +312,13 @@ private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
MutateRowCallable userFacing = new MutateRowCallable(stub.mutateRowCallable(), requestContext);

return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
UnaryCallable<RowMutation, Void> traced =
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
new TracedUnaryCallable<>(
userFacing,
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, "MutateRow"));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand All @@ -311,7 +337,16 @@ private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
*/
private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<MutateRowsRequest, Void> baseCallable = createMutateRowsBaseCallable();
return new BulkMutateRowsUserFacingCallable(baseCallable, requestContext);
BulkMutateRowsUserFacingCallable userFacing =
new BulkMutateRowsUserFacingCallable(baseCallable, requestContext);

TracedUnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(
userFacing,
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, "BulkMutateRows"));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand All @@ -338,8 +373,15 @@ private UnaryCallable<RowMutation, Void> createBulkMutateRowsBatchingCallable()
BatchingCallSettings.newBuilder(new MutateRowsBatchingDescriptor())
.setBatchingSettings(settings.bulkMutateRowsSettings().getBatchingSettings());

TracedBatchingCallable<MutateRowsRequest, Void> traced =
new TracedBatchingCallable<>(
baseCallable,
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, "BulkMutateRows"),
batchingCallSettings.getBatchingDescriptor());

UnaryCallable<MutateRowsRequest, Void> batching =
Callables.batching(baseCallable, batchingCallSettings.build(), clientContext);
Callables.batching(traced, batchingCallSettings.build(), clientContext);

MutateRowsUserFacingCallable userFacing =
new MutateRowsUserFacingCallable(batching, requestContext);
Expand All @@ -359,7 +401,7 @@ private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {
new ApiResultRetryAlgorithm<Void>(),
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()));
RetryingExecutor<Void> retryingExecutor =
RetryingExecutorWithContext<Void> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

return new MutateRowsRetryingCallable(
Expand All @@ -381,7 +423,13 @@ private UnaryCallable<ConditionalRowMutation, Boolean> createCheckAndMutateRowCa
CheckAndMutateRowCallable userFacing =
new CheckAndMutateRowCallable(stub.checkAndMutateRowCallable(), requestContext);

return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
TracedUnaryCallable<ConditionalRowMutation, Boolean> traced =
new TracedUnaryCallable<>(
userFacing,
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, "CheckAndMutateRow"));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand All @@ -397,7 +445,13 @@ private UnaryCallable<ReadModifyWriteRow, Row> createReadModifyWriteRowCallable(
ReadModifyWriteRowCallable userFacing =
new ReadModifyWriteRowCallable(stub.readModifyWriteRowCallable(), requestContext);

return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
TracedUnaryCallable<ReadModifyWriteRow, Row> traced =
new TracedUnaryCallable<>(
userFacing,
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, "ReadModifyWriteRow"));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
// </editor-fold>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.core.GoogleCredentialsProvider;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.BatchingCallSettings;
Expand All @@ -28,6 +30,7 @@
import com.google.api.gax.rpc.StubSettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.tracing.OpencensusTracerFactory;
import com.google.cloud.bigtable.data.v2.internal.DummyBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
Expand All @@ -37,6 +40,7 @@
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -268,6 +272,13 @@ private Builder() {
setStreamWatchdogCheckInterval(baseDefaults.getStreamWatchdogCheckInterval());
setStreamWatchdogProvider(baseDefaults.getStreamWatchdogProvider());

setTracerFactory(
new OpencensusTracerFactory(
ImmutableMap.of(
"gax", GaxGrpcProperties.getGaxGrpcVersion(),
"grpc", GaxGrpcProperties.getGrpcVersion(),
"gapic", GaxProperties.getLibraryVersion(EnhancedBigtableStubSettings.class))));

// Per-method settings using baseSettings for defaults.
readRowsSettings = ServerStreamingCallSettings.newBuilder();
readRowsSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ public Void call() {
return null;
}

callContext
.getTracer()
.attemptStarted(externalFuture.getAttemptSettings().getOverallAttemptCount());

// Make the actual call
ApiFuture<List<MutateRowsResponse>> innerFuture =
innerCallable.futureCall(currentRequest, currentCallContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.mutaterows;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ServerStreamingCallable;
Expand All @@ -42,13 +42,13 @@
public class MutateRowsRetryingCallable extends UnaryCallable<MutateRowsRequest, Void> {
private final ApiCallContext callContextPrototype;
private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable;
private final RetryingExecutor<Void> executor;
private final RetryingExecutorWithContext<Void> executor;
private final ImmutableSet<Code> retryCodes;

public MutateRowsRetryingCallable(
@Nonnull ApiCallContext callContextPrototype,
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable,
@Nonnull RetryingExecutor<Void> executor,
@Nonnull RetryingExecutorWithContext<Void> executor,
@Nonnull Set<StatusCode.Code> retryCodes) {
this.callContextPrototype = Preconditions.checkNotNull(callContextPrototype);
this.callable = Preconditions.checkNotNull(callable);
Expand All @@ -62,7 +62,7 @@ public RetryingFuture<Void> futureCall(MutateRowsRequest request, ApiCallContext
MutateRowsAttemptCallable retryCallable =
new MutateRowsAttemptCallable(callable.all(), request, context, retryCodes);

RetryingFuture<Void> retryingFuture = executor.createFuture(retryCallable);
RetryingFuture<Void> retryingFuture = executor.createFuture(retryCallable, context);
retryCallable.setExternalFuture(retryingFuture);
retryCallable.call();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.tracing;

import com.google.api.core.InternalApi;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.api.gax.tracing.SpanName;

/**
* Simple wrapper around {@link ApiTracerFactory} to augment the client name of the generated
* traces.
*
* <p>This is used to disambiguate traces in underlying GAPIC client from the manually written
* overlay.
*
* <p>For internal use, public for technical reasons.
*/
@InternalApi
public class WrappedTracerFactory implements ApiTracerFactory {
private final ApiTracerFactory innerFactory;
private final String clientName;

public WrappedTracerFactory(ApiTracerFactory tracerFactory, String clientName) {
this.innerFactory = tracerFactory;
this.clientName = clientName;
}

@Override
public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) {
spanName = SpanName.of(clientName, spanName.getMethodName());

return innerFactory.newTracer(parent, spanName, operationType);
}
}