Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Instrument gRPC downloadTo and Copy #2818

Merged
merged 2 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
import com.google.cloud.storage.UnifiedOpts.ProjectId;
import com.google.cloud.storage.UnifiedOpts.UserProject;
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil;
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Scope;
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Span;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -698,56 +700,66 @@ public Blob compose(ComposeRequest composeRequest) {

@Override
public CopyWriter copy(CopyRequest copyRequest) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a confusing diff but basically the try,catch and finally block are new and the rest has been indented within the try block unchanged.

BlobId src = copyRequest.getSource();
BlobInfo dst = copyRequest.getTarget();
Opts<ObjectSourceOpt> srcOpts =
Opts.unwrap(copyRequest.getSourceOptions())
.projectAsSource()
.resolveFrom(src)
.prepend(defaultOpts);
Opts<ObjectTargetOpt> dstOpts =
Opts.unwrap(copyRequest.getTargetOptions()).resolveFrom(dst).prepend(defaultOpts);

Mapper<RewriteObjectRequest.Builder> mapper =
srcOpts.rewriteObjectsRequest().andThen(dstOpts.rewriteObjectsRequest());

Object srcProto = codecs.blobId().encode(src);
Object dstProto = codecs.blobInfo().encode(dst);

RewriteObjectRequest.Builder b =
RewriteObjectRequest.newBuilder()
.setDestinationName(dstProto.getName())
.setDestinationBucket(dstProto.getBucket())
// destination_kms_key comes from dstOpts
// according to the docs in the protos, it is illegal to populate the following fields,
// clear them out if they are set
// destination_predefined_acl comes from dstOpts
// if_*_match come from srcOpts and dstOpts
// copy_source_encryption_* come from srcOpts
// common_object_request_params come from dstOpts
.setDestination(dstProto.toBuilder().clearName().clearBucket().clearKmsKey().build())
.setSourceBucket(srcProto.getBucket())
.setSourceObject(srcProto.getName());

if (src.getGeneration() != null) {
b.setSourceGeneration(src.getGeneration());
}
Span otelSpan = openTelemetryTraceUtil.startSpan("copy", this.getClass().getName());
try (Scope unused = otelSpan.makeCurrent()) {
BlobId src = copyRequest.getSource();
BlobInfo dst = copyRequest.getTarget();
Opts<ObjectSourceOpt> srcOpts =
Opts.unwrap(copyRequest.getSourceOptions())
.projectAsSource()
.resolveFrom(src)
.prepend(defaultOpts);
Opts<ObjectTargetOpt> dstOpts =
Opts.unwrap(copyRequest.getTargetOptions()).resolveFrom(dst).prepend(defaultOpts);

Mapper<RewriteObjectRequest.Builder> mapper =
srcOpts.rewriteObjectsRequest().andThen(dstOpts.rewriteObjectsRequest());

Object srcProto = codecs.blobId().encode(src);
Object dstProto = codecs.blobInfo().encode(dst);

RewriteObjectRequest.Builder b =
RewriteObjectRequest.newBuilder()
.setDestinationName(dstProto.getName())
.setDestinationBucket(dstProto.getBucket())
// destination_kms_key comes from dstOpts
// according to the docs in the protos, it is illegal to populate the following
// fields,
// clear them out if they are set
// destination_predefined_acl comes from dstOpts
// if_*_match come from srcOpts and dstOpts
// copy_source_encryption_* come from srcOpts
// common_object_request_params come from dstOpts
.setDestination(dstProto.toBuilder().clearName().clearBucket().clearKmsKey().build())
.setSourceBucket(srcProto.getBucket())
.setSourceObject(srcProto.getName());

if (src.getGeneration() != null) {
b.setSourceGeneration(src.getGeneration());
}

if (copyRequest.getMegabytesCopiedPerChunk() != null) {
b.setMaxBytesRewrittenPerCall(copyRequest.getMegabytesCopiedPerChunk() * _1MiB);
}
if (copyRequest.getMegabytesCopiedPerChunk() != null) {
b.setMaxBytesRewrittenPerCall(copyRequest.getMegabytesCopiedPerChunk() * _1MiB);
}

RewriteObjectRequest req = mapper.apply(b).build();
GrpcCallContext grpcCallContext =
srcOpts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
UnaryCallable<RewriteObjectRequest, RewriteResponse> callable =
storageClient.rewriteObjectCallable().withDefaultCallContext(grpcCallContext);
GrpcCallContext retryContext = Retrying.newCallContext();
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> callable.call(req, retryContext),
(resp) -> new GapicCopyWriter(this, callable, retryAlgorithmManager.idempotent(), resp));
RewriteObjectRequest req = mapper.apply(b).build();
GrpcCallContext grpcCallContext =
srcOpts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
UnaryCallable<RewriteObjectRequest, RewriteResponse> callable =
storageClient.rewriteObjectCallable().withDefaultCallContext(grpcCallContext);
GrpcCallContext retryContext = Retrying.newCallContext();
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> callable.call(req, retryContext),
(resp) -> new GapicCopyWriter(this, callable, retryAlgorithmManager.idempotent(), resp));
} catch (Exception e) {
otelSpan.recordException(e);
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
throw StorageException.coalesce(e);
} finally {
otelSpan.end();
}
}

@Override
Expand Down Expand Up @@ -803,27 +815,39 @@ public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) {

@Override
public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {
Span otelSpan = openTelemetryTraceUtil.startSpan("downloadTo", this.getClass().getName());

UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);

try (UnbufferedReadableByteChannel r = session.open();
try (Scope unused = otelSpan.makeCurrent();
UnbufferedReadableByteChannel r = session.open();
WritableByteChannel w = Files.newByteChannel(path, WRITE_OPS)) {
ByteStreams.copy(r, w);
} catch (ApiException | IOException e) {
otelSpan.recordException(e);
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
throw StorageException.coalesce(e);
} finally {
otelSpan.end();
}
}

@Override
public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) {
Span otelSpan = openTelemetryTraceUtil.startSpan("downloadTo", this.getClass().getName());

UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);

try (UnbufferedReadableByteChannel r = session.open();
try (Scope unused = otelSpan.makeCurrent();
UnbufferedReadableByteChannel r = session.open();
WritableByteChannel w = Channels.newChannel(outputStream)) {
ByteStreams.copy(r, w);
} catch (ApiException | IOException e) {
otelSpan.recordException(e);
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
throw StorageException.coalesce(e);
} finally {
otelSpan.end();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.cloud.NoCredentials;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.Storage.CopyRequest;
import com.google.cloud.storage.it.runner.StorageITRunner;
import com.google.cloud.storage.it.runner.annotations.Backend;
import com.google.cloud.storage.it.runner.annotations.Inject;
Expand All @@ -31,9 +34,11 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -51,6 +56,7 @@ public class ITGrpcOpenTelemetryTest {
private BlobId blobId;
@Inject public Generator generator;
@Inject public BucketInfo testBucket;
private static final Path tmpDir = Paths.get(System.getProperty("java.io.tmpdir"));

@Before
public void setUp() {
Expand Down Expand Up @@ -109,7 +115,7 @@ public void runReadAllBytes() {
}

@Test
public void createFrom() throws IOException {
public void runCreateFrom() throws IOException {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed for continuity of naming for the rest of the tests, unrelated.

Path helloWorldTxtGz = File.createTempFile(blobId.getName(), ".txt.gz").toPath();
storage.createFrom(BlobInfo.newBuilder(blobId).build(), helloWorldTxtGz);
TestExporter testExported = (TestExporter) exporter;
Expand All @@ -119,6 +125,51 @@ public void createFrom() throws IOException {
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("internalCreateFrom")));
}

@Test
public void runDownloadToPath() throws IOException {
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
storage.create(blobInfo, helloWorldTextBytes);
try (TmpFile file = TmpFile.of(tmpDir, "download-to", ".txt")) {
storage.downloadTo(blobId, file.getPath());
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("downloadTo")));
}
}

@Test
public void runDownloadToOutputStream() {
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
storage.create(blobInfo, helloWorldTextBytes);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
storage.downloadTo(blobId, baos);
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("downloadTo")));
}

@Test
public void runCopy() {
BlobInfo info =
BlobInfo.newBuilder(testBucket, generator.randomObjectName() + "copy/src").build();
Blob cpySrc = storage.create(info, helloWorldTextBytes, BlobTargetOption.doesNotExist());
BlobInfo dst =
BlobInfo.newBuilder(testBucket, generator.randomObjectName() + "copy/dst").build();
CopyRequest copyRequest =
CopyRequest.newBuilder()
.setSource(cpySrc.getBlobId())
.setSourceOptions(BlobSourceOption.generationMatch(cpySrc.getGeneration()))
.setTarget(dst, BlobTargetOption.doesNotExist())
.build();
storage.copy(copyRequest);
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("copy")));
}

private void checkCommonAttributes(List<SpanData> spanData) {
for (SpanData span : spanData) {
Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service"));
Expand Down
Loading