Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Instrument HTTP createFrom #2824

Merged
merged 2 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import com.google.common.io.BaseEncoding;
import com.google.common.io.CountingOutputStream;
import com.google.common.primitives.Ints;
import io.opentelemetry.api.trace.StatusCode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -123,12 +124,14 @@ final class StorageImpl extends BaseService<StorageOptions> implements Storage,
final HttpRetryAlgorithmManager retryAlgorithmManager;
final StorageRpc storageRpc;
final WriterFactory writerFactory;
private final OpenTelemetryTraceUtil openTelemetryTraceUtil;

StorageImpl(HttpStorageOptions options, WriterFactory writerFactory) {
super(options);
this.retryAlgorithmManager = options.getRetryAlgorithmManager();
this.storageRpc = options.getStorageRpcV1();
this.writerFactory = writerFactory;
this.openTelemetryTraceUtil = OpenTelemetryTraceUtil.getInstance(options);
}

@Override
Expand Down Expand Up @@ -243,45 +246,55 @@ public Blob createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options)
@Override
public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options)
throws IOException {
if (Files.isDirectory(path)) {
throw new StorageException(0, path + " is a directory");
}
long size = Files.size(path);
if (size == 0L) {
return create(blobInfo, null, options);
}
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo);
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null);
BlobInfo updated = opts.blobInfoMapper().apply(builder).build();
StorageObject encode = codecs.blobInfo().encode(updated);

Supplier<String> uploadIdSupplier =
ResumableMedia.startUploadForBlobInfo(
getOptions(),
updated,
optionsMap,
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap));
JsonResumableWrite jsonResumableWrite =
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);

JsonResumableSession session =
ResumableSession.json(
HttpClientContext.from(storageRpc),
getOptions().asRetryDependencies(),
retryAlgorithmManager.idempotent(),
jsonResumableWrite);
HttpContentRange contentRange = HttpContentRange.of(ByteRangeSpec.explicit(0L, size), size);
ResumableOperationResult<StorageObject> put =
session.put(RewindableContent.of(path), contentRange);
// all exception translation is taken care of down in the JsonResumableSession
StorageObject object = put.getObject();
if (object == null) {
// if by some odd chance the put didn't get the StorageObject, query for it
ResumableOperationResult<@Nullable StorageObject> query = session.query();
object = query.getObject();
OpenTelemetryTraceUtil.Span otelSpan =
openTelemetryTraceUtil.startSpan("createFrom", this.getClass().getName());
try (OpenTelemetryTraceUtil.Scope scope = otelSpan.makeCurrent()) {
if (Files.isDirectory(path)) {
throw new StorageException(0, path + " is a directory");
}
long size = Files.size(path);
if (size == 0L) {
return create(blobInfo, null, options);
}
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo);
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null);
BlobInfo updated = opts.blobInfoMapper().apply(builder).build();
StorageObject encode = codecs.blobInfo().encode(updated);

Supplier<String> uploadIdSupplier =
ResumableMedia.startUploadForBlobInfo(
getOptions(),
updated,
optionsMap,
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap));
JsonResumableWrite jsonResumableWrite =
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);

JsonResumableSession session =
ResumableSession.json(
HttpClientContext.from(storageRpc),
getOptions().asRetryDependencies(),
retryAlgorithmManager.idempotent(),
jsonResumableWrite);
HttpContentRange contentRange = HttpContentRange.of(ByteRangeSpec.explicit(0L, size), size);
ResumableOperationResult<StorageObject> put =
session.put(RewindableContent.of(path), contentRange);
// all exception translation is taken care of down in the JsonResumableSession
StorageObject object = put.getObject();
if (object == null) {
// if by some odd chance the put didn't get the StorageObject, query for it
ResumableOperationResult<@Nullable StorageObject> query = session.query();
object = query.getObject();
}
return codecs.blobInfo().decode(object).asBlob(this);
} catch (Exception e) {
otelSpan.recordException(e);
otelSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName());
throw StorageException.coalesce(e);
} finally {
otelSpan.end();
}
return codecs.blobInfo().decode(object).asBlob(this);
}

@Override
Expand All @@ -294,20 +307,35 @@ public Blob createFrom(BlobInfo blobInfo, InputStream content, BlobWriteOption..
public Blob createFrom(
BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options)
throws IOException {

ApiFuture<BlobInfo> objectFuture;
try (StorageWriteChannel writer = writer(blobInfo, options)) {
objectFuture = writer.getObject();
uploadHelper(Channels.newChannel(content), writer, bufferSize);
}
// keep these two try blocks separate for the time being
// leaving the above will cause the writer to close writing and finalizing the session and
// (hopefully, on successful finalization) resolve our future
try {
BlobInfo info = objectFuture.get(10, TimeUnit.SECONDS);
return info.asBlob(this);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
throw StorageException.coalesce(e);
OpenTelemetryTraceUtil.Span otelSpan =
openTelemetryTraceUtil.startSpan("createFrom", this.getClass().getName());
try (OpenTelemetryTraceUtil.Scope scope = otelSpan.makeCurrent()) {

ApiFuture<BlobInfo> objectFuture;
try (StorageWriteChannel writer = writer(blobInfo, options)) {
objectFuture = writer.getObject();
uploadHelper(Channels.newChannel(content), writer, bufferSize);
}
// keep these two try blocks separate for the time being
// leaving the above will cause the writer to close writing and finalizing the session and
// (hopefully, on successful finalization) resolve our future
try {
BlobInfo info = objectFuture.get(10, TimeUnit.SECONDS);
return info.asBlob(this);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
otelSpan.recordException(e);
otelSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName());
throw StorageException.coalesce(e);
}
} catch (Exception e) {
otelSpan.recordException(e);
otelSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName());
// We don't want to wrap the storage exception, but we want to record any other exception
// we simply throw the exception after recording in the span.
throw e;

} finally {
otelSpan.end();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
Expand Down Expand Up @@ -152,6 +154,26 @@ public void runReadAllBytes() {
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("load")));
}

@Test
public void runCreateFromPath() throws IOException {
Path helloWorldTxtGz = File.createTempFile(blobId.getName(), ".txt.gz").toPath();
storage.createFrom(BlobInfo.newBuilder(blobId).build(), helloWorldTxtGz);
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("createFrom")));
}

@Test
public void runCreateFromInputStream() throws IOException {
InputStream inputStream = new ByteArrayInputStream(helloWorldTextBytes);
storage.createFrom(BlobInfo.newBuilder(blobId).build(), inputStream);
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("createFrom")));
}

private void checkCommonAttributes(List<SpanData> spanData) {
for (SpanData span : spanData) {
Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service"));
Expand Down
Loading
Loading