Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Instrument ReadAllBytes and CreateFrom in gRPC #2815

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,39 +315,51 @@ public Blob createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options)
@Override
public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options)
throws IOException {
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
return internalCreateFrom(path, blobInfo, opts);
OpenTelemetryTraceUtil.Span otelSpan =
openTelemetryTraceUtil.startSpan("createFrom", this.getClass().getName());
try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) {
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
return internalCreateFrom(path, blobInfo, opts, openTelemetryTraceUtil.currentContext());
} catch (Exception e) {
otelSpan.recordException(e);
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
throw StorageException.coalesce(e);
} finally {
otelSpan.end();
}
}

@Override
public Blob internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOpt> opts)
public Blob internalCreateFrom(
Path path, BlobInfo info, Opts<ObjectTargetOpt> opts, OpenTelemetryTraceUtil.Context ctx)
throws IOException {
OpenTelemetryTraceUtil.Span otelSpan =
openTelemetryTraceUtil.startSpan("internalCreateFrom", this.getClass().getName(), ctx);
requireNonNull(path, "path must be non null");
if (Files.isDirectory(path)) {
throw new StorageException(0, path + " is a directory");
}

GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(info, opts);

ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write =
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext);

ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req, opts);
ApiFuture<GrpcResumableSession> session2 =
ApiFutures.transform(
start,
rw ->
ResumableSession.grpc(
getOptions(),
retryAlgorithmManager.idempotent(),
write,
storageClient.queryWriteStatusCallable(),
rw,
Hasher.noop()),
MoreExecutors.directExecutor());
try {
try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) {
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(info, opts);

ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write =
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext);

ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req, opts);
ApiFuture<GrpcResumableSession> session2 =
ApiFutures.transform(
start,
rw ->
ResumableSession.grpc(
getOptions(),
retryAlgorithmManager.idempotent(),
write,
storageClient.queryWriteStatusCallable(),
rw,
Hasher.noop()),
MoreExecutors.directExecutor());
GrpcResumableSession got = session2.get();
ResumableOperationResult<@Nullable Object> put = got.put(RewindableContent.of(path));
Object object = put.getObject();
Expand All @@ -358,7 +370,11 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOpt> o
}
return codecs.blobInfo().decode(object).asBlob(this);
} catch (InterruptedException | ExecutionException e) {
otelSpan.recordException(e);
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
throw StorageException.coalesce(e);
} finally {
otelSpan.end();
}
}

Expand All @@ -372,37 +388,46 @@ public Blob createFrom(BlobInfo blobInfo, InputStream content, BlobWriteOption..
public Blob createFrom(
BlobInfo blobInfo, InputStream in, int bufferSize, BlobWriteOption... options)
throws IOException {
requireNonNull(blobInfo, "blobInfo must be non null");
OpenTelemetryTraceUtil.Span otelSpan =
openTelemetryTraceUtil.startSpan("createFrom", this.getClass().getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since createFrom has a few different signatures, does which signature we're working with need to be captured in the span?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are only concerned with the method not the parameters.

try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) {
requireNonNull(blobInfo, "blobInfo must be non null");

Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);

ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req, opts);

BufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
.byteChannel(
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext))
.setHasher(Hasher.noop())
.setByteStringStrategy(ByteStringStrategy.noCopy())
.resumable()
.withRetryConfig(getOptions(), retryAlgorithmManager.idempotent())
.buffered(Buffers.allocateAligned(bufferSize, _256KiB))
.setStartAsync(start)
.build();
ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req, opts);

// Specifically not in the try-with, so we don't close the provided stream
ReadableByteChannel src =
Channels.newChannel(firstNonNull(in, new ByteArrayInputStream(ZERO_BYTES)));
try (BufferedWritableByteChannel dst = session.open()) {
ByteStreams.copy(src, dst);
} catch (Exception e) {
throw StorageException.coalesce(e);
BufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
.byteChannel(
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext))
.setHasher(Hasher.noop())
.setByteStringStrategy(ByteStringStrategy.noCopy())
.resumable()
.withRetryConfig(getOptions(), retryAlgorithmManager.idempotent())
.buffered(Buffers.allocateAligned(bufferSize, _256KiB))
.setStartAsync(start)
.build();

// Specifically not in the try-with, so we don't close the provided stream
ReadableByteChannel src =
Channels.newChannel(firstNonNull(in, new ByteArrayInputStream(ZERO_BYTES)));
try (BufferedWritableByteChannel dst = session.open()) {
ByteStreams.copy(src, dst);
} catch (Exception e) {
otelSpan.recordException(e);
otelSpan.setStatus(
io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
throw StorageException.coalesce(e);
}
return getBlob(session.getResult());
} finally {
otelSpan.end();
}
return getBlob(session.getResult());
}

@Override
Expand Down Expand Up @@ -732,14 +757,21 @@ public byte[] readAllBytes(String bucket, String blob, BlobSourceOption... optio

@Override
public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) {
OpenTelemetryTraceUtil.Span otelSpan =
openTelemetryTraceUtil.startSpan("readAllBytes", this.getClass().getName());
UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (UnbufferedReadableByteChannel r = session.open();
try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent();
UnbufferedReadableByteChannel r = session.open();
WritableByteChannel w = Channels.newChannel(baos)) {
ByteStreams.copy(r, w);
} catch (ApiException | IOException e) {
otelSpan.recordException(e);
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
throw StorageException.coalesce(e);
} finally {
otelSpan.end();
}
return baos.toByteArray();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ default BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetO
throw new UnsupportedOperationException("not implemented");
}

default BlobInfo internalCreateFrom(
Path path, BlobInfo info, Opts<ObjectTargetOpt> opts, OpenTelemetryTraceUtil.Context ctx)
throws IOException {
throw new UnsupportedOperationException("not implemented");
}

default BlobInfo internalDirectUpload(
BlobInfo blobInfo, Opts<ObjectTargetOpt> opts, ByteBuffer buf) {
throw new UnsupportedOperationException("not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -44,6 +47,8 @@ public class ITGrpcOpenTelemetryTest {
private StorageOptions options;
private SpanExporter exporter;
private Storage storage;
private static final byte[] helloWorldTextBytes = "hello world".getBytes();
private BlobId blobId;
@Inject public Generator generator;
@Inject public BucketInfo testBucket;

Expand All @@ -65,20 +70,17 @@ public void setUp() {
.setOpenTelemetrySdk(openTelemetrySdk)
.build();
storage = options.getService();
String objectString = generator.randomObjectName();
blobId = BlobId.of(testBucket.getName(), objectString);
}

@Test
public void runCreateBucket() {
String bucket = "random-bucket";
storage.create(BucketInfo.of(bucket));
TestExporter testExported = (TestExporter) exporter;
SpanData spanData = testExported.getExportedSpans().get(0);
Assert.assertEquals("Storage", getAttributeValue(spanData, "gcp.client.service"));
Assert.assertEquals("googleapis/java-storage", getAttributeValue(spanData, "gcp.client.repo"));
Assert.assertEquals(
"com.google.cloud.google-cloud-storage",
getAttributeValue(spanData, "gcp.client.artifact"));
Assert.assertEquals("grpc", getAttributeValue(spanData, "rpc.system"));
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
}

@Test
Expand All @@ -88,17 +90,43 @@ public void runCreateBlob() {
storage.create(BlobInfo.newBuilder(toCreate).build(), content);
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("create")));
Assert.assertTrue(
spanData.stream().anyMatch(x -> x.getName().contains("internalDirectUpload")));
Assert.assertEquals(spanData.get(1).getSpanContext(), spanData.get(0).getParentSpanContext());
}

@Test
public void runReadAllBytes() {
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
storage.create(blobInfo, helloWorldTextBytes);
byte[] read = storage.readAllBytes(blobId);
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("readAllBytes")));
}

@Test
public void createFrom() throws IOException {
Path helloWorldTxtGz = File.createTempFile(blobId.getName(), ".txt.gz").toPath();
storage.createFrom(BlobInfo.newBuilder(blobId).build(), helloWorldTxtGz);
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("createFrom")));
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("internalCreateFrom")));
}

private void checkCommonAttributes(List<SpanData> spanData) {
for (SpanData span : spanData) {
Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service"));
Assert.assertEquals("googleapis/java-storage", getAttributeValue(span, "gcp.client.repo"));
Assert.assertEquals(
"com.google.cloud.google-cloud-storage", getAttributeValue(span, "gcp.client.artifact"));
Assert.assertEquals("grpc", getAttributeValue(span, "rpc.system"));
}
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("create")));
Assert.assertTrue(
spanData.stream().anyMatch(x -> x.getName().contains("internalDirectUpload")));
Assert.assertEquals(spanData.get(1).getSpanContext(), spanData.get(0).getParentSpanContext());
}

private String getAttributeValue(SpanData spanData, String key) {
Expand Down
Loading