Skip to content

Commit

Permalink
pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sydney-munro committed Oct 28, 2024
1 parent 3543adf commit b4d2591
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,7 @@ public void close() throws Exception {

@Override
public Bucket create(BucketInfo bucketInfo, BucketTargetOption... options) {
OpenTelemetryTraceUtil.Span otelSpan =
openTelemetryTraceUtil.startSpan("create(Bucket, BucketTargetOption");
OpenTelemetryTraceUtil.Span otelSpan = openTelemetryTraceUtil.startSpan("create");
Opts<BucketTargetOpt> opts = Opts.unwrap(options).resolveFrom(bucketInfo).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
Expand Down Expand Up @@ -249,8 +248,7 @@ public Blob create(
BlobInfo blobInfo, byte[] content, int offset, int length, BlobTargetOption... options) {
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo);
// Start the otel span to retain information of the origin of the request
OpenTelemetryTraceUtil.Span otelSpan =
openTelemetryTraceUtil.startSpan("create(BlobInfo, BlobTargetOption");
OpenTelemetryTraceUtil.Span otelSpan = openTelemetryTraceUtil.startSpan("create");
try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) {
return internalDirectUpload(
blobInfo,
Expand All @@ -269,7 +267,8 @@ public Blob create(

@Override
public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) {
try {
OpenTelemetryTraceUtil.Span otelSpan = openTelemetryTraceUtil.startSpan("create");
try (OpenTelemetryTraceUtil.Scope ununsed = otelSpan.makeCurrent()) {
requireNonNull(blobInfo, "blobInfo must be non null");
InputStream inputStreamParam = firstNonNull(content, new ByteArrayInputStream(ZERO_BYTES));

Expand All @@ -296,7 +295,11 @@ public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... op
ApiFuture<WriteObjectResponse> responseApiFuture = session.getResult();
return this.getBlob(responseApiFuture);
} catch (IOException | ApiException e) {
otelSpan.recordException(e);
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
throw StorageException.coalesce(e);
} finally {
otelSpan.end();
}
}

Expand Down Expand Up @@ -812,6 +815,12 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options
hasher);
}

@Override
public BlobInfo internalDirectUpload(
BlobInfo blobInfo, Opts<ObjectTargetOpt> opts, ByteBuffer buf) {
return internalDirectUpload(blobInfo, opts, buf, null);
}

@Override
public BlobInfo internalDirectUpload(
BlobInfo blobInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ public void close() throws IOException {
// We never created any parts
// create an empty object
try {
BlobInfo blobInfo =
storage.internalDirectUpload(ultimateObject, opts, Buffers.allocate(0), null);
// TODO: Add in Otel context when available
BlobInfo blobInfo = storage.internalDirectUpload(ultimateObject, opts, Buffers.allocate(0));
finalObject.set(blobInfo);
return;
} catch (StorageException se) {
Expand Down Expand Up @@ -287,7 +287,7 @@ private void internalFlush(ByteBuffer buf) {
info -> {
try {
// TODO: Add in Otel context when available
return storage.internalDirectUpload(info, partOpts, buf, null);
return storage.internalDirectUpload(info, partOpts, buf);
} catch (StorageException e) {
// a precondition failure usually means the part was created, but we didn't get the
// response. And when we tried to retry the object already exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ default BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetO
throw new UnsupportedOperationException("not implemented");
}

default BlobInfo internalDirectUpload(
BlobInfo blobInfo, Opts<ObjectTargetOpt> opts, ByteBuffer buf) {
throw new UnsupportedOperationException("not implemented");
}

default BlobInfo internalDirectUpload(
BlobInfo info,
Opts<ObjectTargetOpt> opts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,16 @@ public void runCreateBlob() {
storage.create(BlobInfo.newBuilder(toCreate).build(), content);
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
// (1) Span when calling create
// (2) Span when passing call to internalDirectUpload
Assert.assertEquals(2, spanData.size());
for (SpanData span : spanData) {
Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service"));
Assert.assertEquals("googleapis/java-storage", getAttributeValue(span, "gcp.client.repo"));
Assert.assertEquals(
"com.google.cloud.google-cloud-storage", getAttributeValue(span, "gcp.client.artifact"));
Assert.assertEquals("grpc", getAttributeValue(span, "rpc.system"));
}
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("create")));
Assert.assertTrue(
spanData.stream().anyMatch(x -> x.getName().contains("internalDirectUpload")));
Assert.assertEquals(spanData.get(1).getSpanContext(), spanData.get(0).getParentSpanContext());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -357,12 +356,9 @@ public void partsRetainMetadata() throws Exception {
new FakeStorageInternal() {
@Override
public BlobInfo internalDirectUpload(
BlobInfo info,
Opts<ObjectTargetOpt> opts,
ByteBuffer buf,
OpenTelemetryTraceUtil.Context ctx) {
BlobInfo info, Opts<ObjectTargetOpt> opts, ByteBuffer buf) {
metadatas.add(info.getMetadata());
return super.internalDirectUpload(info, opts, buf, null);
return super.internalDirectUpload(info, opts, buf);
}

@Override
Expand Down Expand Up @@ -450,10 +446,7 @@ public void creatingAnEmptyObjectWhichFailsIsSetAsResultFailureAndThrowFromClose
new FakeStorageInternal() {
@Override
public BlobInfo internalDirectUpload(
BlobInfo info,
Opts<ObjectTargetOpt> opts,
ByteBuffer buf,
OpenTelemetryTraceUtil.Context ctx) {
BlobInfo info, Opts<ObjectTargetOpt> opts, ByteBuffer buf) {
throw StorageException.coalesce(
ApiExceptionFactory.createException(
null, GrpcStatusCode.of(Code.PERMISSION_DENIED), false));
Expand Down Expand Up @@ -564,10 +557,7 @@ public void shortCircuitExceptionResultsInFastFailure() throws Exception {
new FakeStorageInternal() {
@Override
public BlobInfo internalDirectUpload(
BlobInfo info,
Opts<ObjectTargetOpt> opts,
ByteBuffer buf,
OpenTelemetryTraceUtil.Context ctx) {
BlobInfo info, Opts<ObjectTargetOpt> opts, ByteBuffer buf) {
if (induceFailure.getAndSet(false)) {
Uninterruptibles.awaitUninterruptibly(blockForWrite1);
try {
Expand All @@ -581,7 +571,7 @@ public BlobInfo internalDirectUpload(
blockForWrite1Complete.countDown();
}
} else {
return super.internalDirectUpload(info, opts, buf, null);
return super.internalDirectUpload(info, opts, buf);
}
}
};
Expand Down Expand Up @@ -724,11 +714,8 @@ public void partFailedPreconditionOnRetryIsHandledGracefully() throws Exception
new FakeStorageInternal() {
@Override
public BlobInfo internalDirectUpload(
BlobInfo info,
Opts<ObjectTargetOpt> opts,
ByteBuffer buf,
OpenTelemetryTraceUtil.Context ctx) {
BlobInfo blobInfo = super.internalDirectUpload(info, opts, buf, null);
BlobInfo info, Opts<ObjectTargetOpt> opts, ByteBuffer buf) {
BlobInfo blobInfo = super.internalDirectUpload(info, opts, buf);
if (info.getName().equals(p1.getName())) {
throw StorageException.coalesce(
ApiExceptionFactory.createException(
Expand Down Expand Up @@ -791,10 +778,7 @@ public void partMetadataFieldDecoratorUsesCustomTime() throws IOException {
new FakeStorageInternal() {
@Override
public BlobInfo internalDirectUpload(
BlobInfo info,
Opts<ObjectTargetOpt> opts,
ByteBuffer buf,
OpenTelemetryTraceUtil.Context ctx) {
BlobInfo info, Opts<ObjectTargetOpt> opts, ByteBuffer buf) {
if (info.getBlobId().getName().endsWith(".part")) {
// Kinda hacky but since we are creating multiple parts we will use a range
// to ensure the customTimes are being calculated appropriately
Expand All @@ -803,7 +787,7 @@ public BlobInfo internalDirectUpload(
} else {
assertThat(info.getCustomTimeOffsetDateTime()).isNull();
}
return super.internalDirectUpload(info, opts, buf, null);
return super.internalDirectUpload(info, opts, buf);
}
};
ParallelCompositeUploadWritableByteChannel pcu =
Expand Down Expand Up @@ -859,10 +843,7 @@ private static class FakeStorageInternal implements StorageInternal {

@Override
public BlobInfo internalDirectUpload(
BlobInfo info,
Opts<ObjectTargetOpt> opts,
ByteBuffer buf,
OpenTelemetryTraceUtil.Context ctx) {
BlobInfo info, Opts<ObjectTargetOpt> opts, ByteBuffer buf) {
BlobId id = info.getBlobId();

BlobInfo.Builder b = info.toBuilder();
Expand Down

0 comments on commit b4d2591

Please sign in to comment.