Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OpenTelemetry Traces to GRPC #2783

Merged
merged 6 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/sync-repo-settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,22 @@ branchProtectionRules:
- 'Kokoro - Test: Java GraalVM Native Image'
- 'Kokoro - Test: Java 17 GraalVM Native Image'
- javadoc
- pattern: otel-v1-branch
isAdminEnforced: true
requiredApprovingReviewCount: 1
requiresCodeOwnerReviews: true
requiresStrictStatusChecks: false
requiredStatusCheckContexts:
- dependencies (17)
- lint
- clirr
- units (8)
- units (11)
- 'Kokoro - Test: Integration'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Kokoro need to be updated internally to watch this branch as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ill double check!

- cla/google
- 'Kokoro - Test: Java GraalVM Native Image'
- 'Kokoro - Test: Java 17 GraalVM Native Image'
- javadoc
permissionRules:
- team: yoshi-admins
permission: admin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.cloud.storage.UnifiedOpts.ProjectId;
import com.google.cloud.storage.UnifiedOpts.UserProject;
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -168,6 +169,7 @@ final class GrpcStorageImpl extends BaseService<StorageOptions>
// workaround for https://github.com/googleapis/java-storage/issues/1736
private final Opts<UserProject> defaultOpts;
@Deprecated private final ProjectId defaultProjectId;
private final OpenTelemetryTraceUtil openTelemetryTraceUtil;

GrpcStorageImpl(
GrpcStorageOptions options,
Expand All @@ -184,6 +186,7 @@ final class GrpcStorageImpl extends BaseService<StorageOptions>
this.retryAlgorithmManager = options.getRetryAlgorithmManager();
this.syntaxDecoders = new SyntaxDecoders();
this.defaultProjectId = UnifiedOpts.projectId(options.getProjectId());
this.openTelemetryTraceUtil = OpenTelemetryTraceUtil.getInstance(options);
}

@Override
Expand All @@ -198,6 +201,8 @@ public void close() throws Exception {

@Override
public Bucket create(BucketInfo bucketInfo, BucketTargetOption... options) {
OpenTelemetryTraceUtil.Span otelSpan =
openTelemetryTraceUtil.startSpan("create(Bucket, BucketTargetOption");
Opts<BucketTargetOpt> opts = Opts.unwrap(options).resolveFrom(bucketInfo).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
Expand All @@ -212,11 +217,20 @@ public Bucket create(BucketInfo bucketInfo, BucketTargetOption... options) {
.setParent("projects/_");
CreateBucketRequest req = opts.createBucketsRequest().apply(builder).build();
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> storageClient.createBucketCallable().call(req, merge),
syntaxDecoders.bucket);
try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) {
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> storageClient.createBucketCallable().call(req, merge),
syntaxDecoders.bucket);
} catch (Exception ex) {
otelSpan.recordException(ex);
otelSpan.setStatus(
io.opentelemetry.api.trace.StatusCode.ERROR, ex.getClass().getSimpleName());
throw StorageException.coalesce(ex);
} finally {
otelSpan.end();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public final class GrpcStorageOptions extends StorageOptions
private final boolean grpcClientMetricsManuallyEnabled;
private final GrpcInterceptorProvider grpcInterceptorProvider;
private final BlobWriteSessionConfig blobWriteSessionConfig;
private OpenTelemetrySdk openTelemetrySdk;

private GrpcStorageOptions(Builder builder, GrpcStorageDefaults serviceDefaults) {
super(builder, serviceDefaults);
Expand All @@ -134,6 +135,7 @@ private GrpcStorageOptions(Builder builder, GrpcStorageDefaults serviceDefaults)
this.grpcClientMetricsManuallyEnabled = builder.grpcMetricsManuallyEnabled;
this.grpcInterceptorProvider = builder.grpcInterceptorProvider;
this.blobWriteSessionConfig = builder.blobWriteSessionConfig;
this.openTelemetrySdk = builder.openTelemetrySdk;
}

@Override
Expand Down Expand Up @@ -350,7 +352,7 @@ private Tuple<StorageSettings, Opts<UserProject>> resolveSettingsAndOpts() throw

@Override
public OpenTelemetrySdk getOpenTelemetrySdk() {
return null;
return openTelemetrySdk;
}

/** @since 2.14.0 This new api is in preview and is subject to breaking changes. */
Expand Down Expand Up @@ -435,6 +437,8 @@ public static final class Builder extends StorageOptions.Builder {

private boolean grpcMetricsManuallyEnabled = false;

private OpenTelemetrySdk openTelemetrySdk;

Builder() {}

Builder(StorageOptions options) {
Expand All @@ -446,6 +450,7 @@ public static final class Builder extends StorageOptions.Builder {
this.enableGrpcClientMetrics = gso.enableGrpcClientMetrics;
this.grpcInterceptorProvider = gso.grpcInterceptorProvider;
this.blobWriteSessionConfig = gso.blobWriteSessionConfig;
this.openTelemetrySdk = gso.openTelemetrySdk;
}

/**
Expand Down Expand Up @@ -634,9 +639,15 @@ public GrpcStorageOptions.Builder setBlobWriteSessionConfig(
return this;
}

@Override
public StorageOptions.Builder setOpenTelemetrySdk(@NonNull OpenTelemetrySdk openTelemetrySdk) {
return null;
/**
* Enable OpenTelemetry Tracing and provide an instance for the client to use.
*
* @param openTelemetrySdk User defined instance of OpenTelemetry SDK to be used by the library
*/
public GrpcStorageOptions.Builder setOpenTelemetrySdk(OpenTelemetrySdk openTelemetrySdk) {
requireNonNull(openTelemetrySdk, "openTelemetry must be non null");
this.openTelemetrySdk = openTelemetrySdk;
return this;
}

/** @since 2.14.0 This new api is in preview and is subject to breaking changes. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.GaxProperties;
import com.google.cloud.storage.GrpcStorageOptions;
import com.google.cloud.storage.StorageOptions;
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Context;
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Span;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.SpanBuilder;
Expand All @@ -38,11 +40,14 @@ class OpenTelemetryInstance implements OpenTelemetryTraceUtil {

private static final String LIBRARY_NAME = "cloud.google.com/java/storage";

private final String transport;

public OpenTelemetryInstance(StorageOptions storageOptions) {
this.storageOptions = storageOptions;
this.openTelemetry = storageOptions.getOpenTelemetrySdk();
this.tracer =
openTelemetry.getTracer(LIBRARY_NAME, GaxProperties.getLibraryVersion(this.getClass()));
this.transport = storageOptions instanceof GrpcStorageOptions ? "grpc" : "http";
}

static class Span implements OpenTelemetryTraceUtil.Span {
Expand All @@ -56,7 +61,12 @@ static class Span implements OpenTelemetryTraceUtil.Span {

@Override
public OpenTelemetryTraceUtil.Span recordException(Throwable error) {
span.recordException(error);
span.recordException(
error,
Attributes.of(
AttributeKey.stringKey("exception.message"), error.getMessage(),
AttributeKey.stringKey("exception.type"), error.getClass().getName(),
AttributeKey.stringKey("exception.stacktrace"), error.getStackTrace().toString()));
return this;
}

Expand Down Expand Up @@ -146,6 +156,7 @@ public Scope makeCurrent() {
public OpenTelemetryTraceUtil.Span startSpan(String methodName) {
String formatSpanName = String.format("%s.%s/%s", "storage", "client", methodName);
SpanBuilder spanBuilder = tracer.spanBuilder(formatSpanName).setSpanKind(SpanKind.CLIENT);
spanBuilder.setAttribute("rpc.system", transport);
io.opentelemetry.api.trace.Span span =
addSettingsAttributesToCurrentSpan(spanBuilder).startSpan();
return new Span(span, formatSpanName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@

package com.google.cloud.storage;

import com.google.cloud.storage.TransportCompatibility.Transport;
import com.google.cloud.storage.it.runner.StorageITRunner;
import com.google.cloud.storage.it.runner.annotations.Backend;
import com.google.cloud.storage.it.runner.annotations.CrossRun;
import com.google.cloud.storage.it.runner.annotations.Inject;
import com.google.cloud.storage.it.runner.registry.Generator;
import com.google.cloud.storage.testing.RemoteStorageHelper;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.common.CompletableResultCode;
Expand All @@ -33,16 +28,11 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(StorageITRunner.class)
@CrossRun(
backends = {Backend.PROD},
transports = {Transport.HTTP})
public final class ITOpenTelemetryTest {
@Inject public Generator generator;

@Test
public void checkInstrumentation() {
Expand All @@ -58,29 +48,80 @@ public void checkInstrumentation() {
StorageOptions storageOptions =
StorageOptions.http().setOpenTelemetrySdk(openTelemetrySdk).build();
Storage storage = storageOptions.getService();
storage.create(BucketInfo.of(generator.randomBucketName()));
String bucket = randomBucketName();
storage.create(BucketInfo.of(bucket));
TestExporter testExported = (TestExporter) exporter;
SpanData spanData = testExported.getExportedSpans().get(0);
Assert.assertEquals("Storage", getAttributeValue(spanData, "gcp.client.service"));
Assert.assertEquals("googleapis/java-storage", getAttributeValue(spanData, "gcp.client.repo"));
Assert.assertEquals(
"com.google.cloud.google-cloud-storage",
getAttributeValue(spanData, "gcp.client.artifact"));
Assert.assertEquals("http", getAttributeValue(spanData, "rpc.system"));

// Cleanup
RemoteStorageHelper.forceDelete(storage, bucket);
}

@Test
public void noOpDoesNothing() {
StorageOptions storageOptions = StorageOptions.http().build();
public void checkInstrumentationGrpc() {
SpanExporter exporter = new TestExporter();

OpenTelemetrySdk openTelemetrySdk =
OpenTelemetrySdk.builder()
.setTracerProvider(
SdkTracerProvider.builder()
.addSpanProcessor(SimpleSpanProcessor.create(exporter))
.build())
.build();
StorageOptions storageOptions =
StorageOptions.grpc().setOpenTelemetrySdk(openTelemetrySdk).build();
Storage storage = storageOptions.getService();
storage.create(BucketInfo.of(generator.randomBucketName()));
String bucket = randomBucketName();
storage.create(BucketInfo.of(bucket));
TestExporter testExported = (TestExporter) exporter;
SpanData spanData = testExported.getExportedSpans().get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan on testing which spans are collected per operation in addition to Span attributes? I think this would be more useful for Upload and Download ops.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if i understand this question. what do you mean by which spans are collected

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh i meant; let's say you're uploading a large object to GCS. I would assume there's a span for CreateResumableUpload, then span for each WriteObject request. Wondering if there's a way to test that using spanData.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see, I haven't expanded this for that case yet but yeah we would want to test each span is generated. Maybe we could check for the size to make sure if we expect 3 writeObjectRequests we would have 3 spans

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good start to assert span count; and expand from there.

Assert.assertEquals("Storage", getAttributeValue(spanData, "gcp.client.service"));
Assert.assertEquals("googleapis/java-storage", getAttributeValue(spanData, "gcp.client.repo"));
Assert.assertEquals(
"com.google.cloud.google-cloud-storage",
getAttributeValue(spanData, "gcp.client.artifact"));
Assert.assertEquals("grpc", getAttributeValue(spanData, "rpc.system"));

// Cleanup
RemoteStorageHelper.forceDelete(storage, bucket);
}

@Test
public void noOpDoesNothing() {
String httpBucket = randomBucketName();
String grpcBucket = randomBucketName();
// NoOp for HTTP
StorageOptions storageOptionsHttp = StorageOptions.http().build();
Storage storageHttp = storageOptionsHttp.getService();
storageHttp.create(BucketInfo.of(httpBucket));

// NoOp for Grpc
StorageOptions storageOptionsGrpc = StorageOptions.grpc().build();
Storage storageGrpc = storageOptionsGrpc.getService();
storageGrpc.create(BucketInfo.of(grpcBucket));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is no-op testing and checking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just checking to make sure that it still runs without issue and does nothing. basically just that it doesnt error out.


// cleanup
RemoteStorageHelper.forceDelete(storageHttp, httpBucket);
RemoteStorageHelper.forceDelete(storageGrpc, grpcBucket);
}

private String getAttributeValue(SpanData spanData, String key) {
return spanData.getAttributes().get(AttributeKey.stringKey(key)).toString();
}

public String randomBucketName() {
return "java-storage-grpc-rand-" + UUID.randomUUID();
}
}

class TestExporter implements SpanExporter {

public final List<SpanData> exportedSpans = Collections.synchronizedList(new ArrayList<>());

@Override
Expand Down
Loading