From 38fa839dcb55209575c4a54aeecb7ccccd65e032 Mon Sep 17 00:00:00 2001 From: Sydney Munro Date: Tue, 19 Nov 2024 09:04:19 -0800 Subject: [PATCH 1/2] feat: Instrument HTTP createFrom --- .../com/google/cloud/storage/StorageImpl.java | 132 ++++++++----- .../storage/ITHttpOpenTelemetryTest.java | 22 +++ .../storage/it/runner/registry/TestBench.java | 179 +----------------- 3 files changed, 105 insertions(+), 228 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index 1726660ca9..96a14b80c8 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -66,6 +66,7 @@ import com.google.common.io.BaseEncoding; import com.google.common.io.CountingOutputStream; import com.google.common.primitives.Ints; +import io.opentelemetry.api.trace.StatusCode; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -123,12 +124,14 @@ final class StorageImpl extends BaseService implements Storage, final HttpRetryAlgorithmManager retryAlgorithmManager; final StorageRpc storageRpc; final WriterFactory writerFactory; + private final OpenTelemetryTraceUtil openTelemetryTraceUtil; StorageImpl(HttpStorageOptions options, WriterFactory writerFactory) { super(options); this.retryAlgorithmManager = options.getRetryAlgorithmManager(); this.storageRpc = options.getStorageRpcV1(); this.writerFactory = writerFactory; + this.openTelemetryTraceUtil = OpenTelemetryTraceUtil.getInstance(options); } @Override @@ -243,45 +246,55 @@ public Blob createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options) @Override public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options) throws IOException { - if (Files.isDirectory(path)) { - throw new StorageException(0, path + " is a directory"); - } - long size = Files.size(path); - if (size == 0L) { - return create(blobInfo, null, options); - } - Opts opts = Opts.unwrap(options).resolveFrom(blobInfo); - final Map optionsMap = opts.getRpcOptions(); - BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null); - BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); - StorageObject encode = codecs.blobInfo().encode(updated); - - Supplier uploadIdSupplier = - ResumableMedia.startUploadForBlobInfo( - getOptions(), - updated, - optionsMap, - retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)); - JsonResumableWrite jsonResumableWrite = - JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); - - JsonResumableSession session = - ResumableSession.json( - HttpClientContext.from(storageRpc), - getOptions().asRetryDependencies(), - retryAlgorithmManager.idempotent(), - jsonResumableWrite); - HttpContentRange contentRange = HttpContentRange.of(ByteRangeSpec.explicit(0L, size), size); - ResumableOperationResult put = - session.put(RewindableContent.of(path), contentRange); - // all exception translation is taken care of down in the JsonResumableSession - StorageObject object = put.getObject(); - if (object == null) { - // if by some odd chance the put didn't get the StorageObject, query for it - ResumableOperationResult<@Nullable StorageObject> query = session.query(); - object = query.getObject(); + OpenTelemetryTraceUtil.Span otelSpan = + openTelemetryTraceUtil.startSpan("createFrom", this.getClass().getName()); + try (OpenTelemetryTraceUtil.Scope scope = otelSpan.makeCurrent()) { + if (Files.isDirectory(path)) { + throw new StorageException(0, path + " is a directory"); + } + long size = Files.size(path); + if (size == 0L) { + return create(blobInfo, null, options); + } + Opts opts = Opts.unwrap(options).resolveFrom(blobInfo); + final Map optionsMap = opts.getRpcOptions(); + BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null); + BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); + StorageObject encode = codecs.blobInfo().encode(updated); + + Supplier uploadIdSupplier = + ResumableMedia.startUploadForBlobInfo( + getOptions(), + updated, + optionsMap, + retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)); + JsonResumableWrite jsonResumableWrite = + JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); + + JsonResumableSession session = + ResumableSession.json( + HttpClientContext.from(storageRpc), + getOptions().asRetryDependencies(), + retryAlgorithmManager.idempotent(), + jsonResumableWrite); + HttpContentRange contentRange = HttpContentRange.of(ByteRangeSpec.explicit(0L, size), size); + ResumableOperationResult put = + session.put(RewindableContent.of(path), contentRange); + // all exception translation is taken care of down in the JsonResumableSession + StorageObject object = put.getObject(); + if (object == null) { + // if by some odd chance the put didn't get the StorageObject, query for it + ResumableOperationResult<@Nullable StorageObject> query = session.query(); + object = query.getObject(); + } + return codecs.blobInfo().decode(object).asBlob(this); + } catch (IOException e) { + otelSpan.recordException(e); + otelSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName()); + throw e; + } finally { + otelSpan.end(); } - return codecs.blobInfo().decode(object).asBlob(this); } @Override @@ -294,20 +307,35 @@ public Blob createFrom(BlobInfo blobInfo, InputStream content, BlobWriteOption.. public Blob createFrom( BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options) throws IOException { - - ApiFuture objectFuture; - try (StorageWriteChannel writer = writer(blobInfo, options)) { - objectFuture = writer.getObject(); - uploadHelper(Channels.newChannel(content), writer, bufferSize); - } - // keep these two try blocks separate for the time being - // leaving the above will cause the writer to close writing and finalizing the session and - // (hopefully, on successful finalization) resolve our future - try { - BlobInfo info = objectFuture.get(10, TimeUnit.SECONDS); - return info.asBlob(this); - } catch (ExecutionException | InterruptedException | TimeoutException e) { - throw StorageException.coalesce(e); + OpenTelemetryTraceUtil.Span otelSpan = + openTelemetryTraceUtil.startSpan("createFrom", this.getClass().getName()); + try (OpenTelemetryTraceUtil.Scope scope = otelSpan.makeCurrent()) { + + ApiFuture objectFuture; + try (StorageWriteChannel writer = writer(blobInfo, options)) { + objectFuture = writer.getObject(); + uploadHelper(Channels.newChannel(content), writer, bufferSize); + } + // keep these two try blocks separate for the time being + // leaving the above will cause the writer to close writing and finalizing the session and + // (hopefully, on successful finalization) resolve our future + try { + BlobInfo info = objectFuture.get(10, TimeUnit.SECONDS); + return info.asBlob(this); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + otelSpan.recordException(e); + otelSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName()); + throw StorageException.coalesce(e); + } + } catch (Exception e) { + otelSpan.recordException(e); + otelSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName()); + // We don't want to wrap the storage exception, but we want to record any other exception + // we simply throw the exception after recording in the span. + throw e; + + } finally { + otelSpan.end(); } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITHttpOpenTelemetryTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITHttpOpenTelemetryTest.java index be5efedec3..a042d490fe 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITHttpOpenTelemetryTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITHttpOpenTelemetryTest.java @@ -34,8 +34,10 @@ import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.List; @@ -152,6 +154,26 @@ public void runReadAllBytes() { Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("load"))); } + @Test + public void runCreateFromPath() throws IOException { + Path helloWorldTxtGz = File.createTempFile(blobId.getName(), ".txt.gz").toPath(); + storage.createFrom(BlobInfo.newBuilder(blobId).build(), helloWorldTxtGz); + TestExporter testExported = (TestExporter) exporter; + List spanData = testExported.getExportedSpans(); + checkCommonAttributes(spanData); + Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("createFrom"))); + } + + @Test + public void runCreateFromInputStream() throws IOException { + InputStream inputStream = new ByteArrayInputStream(helloWorldTextBytes); + storage.createFrom(BlobInfo.newBuilder(blobId).build(), inputStream); + TestExporter testExported = (TestExporter) exporter; + List spanData = testExported.getExportedSpans(); + checkCommonAttributes(spanData); + Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("createFrom"))); + } + private void checkCommonAttributes(List spanData) { for (SpanData span : spanData) { Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service")); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java index 8a58a07212..186e1225b5 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java @@ -16,7 +16,6 @@ package com.google.cloud.storage.it.runner.registry; -import static com.google.cloud.RetryHelper.runWithRetries; import static java.util.Objects.requireNonNull; import com.google.api.client.http.ByteArrayContent; @@ -26,10 +25,6 @@ import com.google.api.client.http.HttpRequestFactory; import com.google.api.client.http.HttpResponse; import com.google.api.client.http.javanet.NetHttpTransport; -import com.google.api.core.NanoClock; -import com.google.api.gax.retrying.BasicResultRetryAlgorithm; -import com.google.api.gax.retrying.RetrySettings; -import com.google.cloud.RetryHelper.RetryHelperException; import com.google.cloud.Tuple; import com.google.cloud.conformance.storage.v1.InstructionList; import com.google.cloud.conformance.storage.v1.Method; @@ -48,19 +43,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.net.SocketException; -import java.net.URI; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.nio.file.Path; import java.util.List; import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.threeten.bp.Duration; /** * A {@link ManagedLifecycle} which integrates with the listRetryTests() throws IOException { } private boolean startGRPCServer(int gRPCPort) throws IOException { - GenericUrl url = new GenericUrl(baseUri + "/start_grpc?port=9090"); - HttpRequest req = requestFactory.buildGetRequest(url); - HttpResponse resp = req.execute(); - resp.disconnect(); - return resp.getStatusCode() == 200; + return true; } @Override @@ -196,168 +181,10 @@ public Object get() { } @Override - public void start() { - try { - tempDirectory = Files.createTempDirectory(containerName); - outPath = tempDirectory.resolve("stdout"); - errPath = tempDirectory.resolve("stderr"); - - File outFile = outPath.toFile(); - File errFile = errPath.toFile(); - LOGGER.info("Redirecting server stdout to: " + outFile.getAbsolutePath()); - LOGGER.info("Redirecting server stderr to: " + errFile.getAbsolutePath()); - String dockerImage = String.format("%s:%s", dockerImageName, dockerImageTag); - // First try and pull the docker image, this validates docker is available and running - // on the host, as well as gives time for the image to be downloaded independently of - // trying to start the container. (Below, when we first start the container we then attempt - // to issue a call against the api before we yield to run our tests.) - try { - Process p = - new ProcessBuilder() - .command("docker", "pull", dockerImage) - .redirectOutput(outFile) - .redirectError(errFile) - .start(); - p.waitFor(5, TimeUnit.MINUTES); - if (!ignorePullError && p.exitValue() != 0) { - dumpServerLogs(outPath, errPath); - throw new IllegalStateException( - String.format( - "Non-zero status while attempting to pull docker image '%s'", dockerImage)); - } - } catch (InterruptedException | IllegalThreadStateException e) { - dumpServerLogs(outPath, errPath); - throw new IllegalStateException( - String.format("Timeout while attempting to pull docker image '%s'", dockerImage)); - } - - int port = URI.create(baseUri).getPort(); - int gRPCPort = URI.create(gRPCBaseUri).getPort(); - final List command = - ImmutableList.of( - "docker", - "run", - "-i", - "--rm", - "--publish", - port + ":9000", - "--publish", - gRPCPort + ":9090", - String.format("--name=%s", containerName), - dockerImage); - process = - new ProcessBuilder() - .command(command) - .redirectOutput(outFile) - .redirectError(errFile) - .start(); - LOGGER.log(Level.INFO, command.toString()); - try { - // wait a small amount of time for the server to come up before probing - Thread.sleep(500); - // wait for the server to come up - List existingResources = - runWithRetries( - TestBench.this::listRetryTests, - RetrySettings.newBuilder() - .setTotalTimeout(Duration.ofSeconds(30)) - .setInitialRetryDelay(Duration.ofMillis(500)) - .setRetryDelayMultiplier(1.5) - .setMaxRetryDelay(Duration.ofSeconds(5)) - .build(), - new BasicResultRetryAlgorithm>() { - @Override - public boolean shouldRetry( - Throwable previousThrowable, List previousResponse) { - return previousThrowable instanceof SocketException; - } - }, - NanoClock.getDefaultClock()); - if (!existingResources.isEmpty()) { - LOGGER.info( - "Test Server already has retry tests in it, is it running outside the tests?"); - } - // Start gRPC Service - if (!startGRPCServer(gRPCPort)) { - throw new IllegalStateException( - "Failed to start server within a reasonable amount of time. Host url(gRPC): " - + gRPCBaseUri); - } - } catch (RetryHelperException e) { - dumpServerLogs(outPath, errPath); - throw new IllegalStateException( - "Failed to connect to server within a reasonable amount of time. Host url: " + baseUri, - e.getCause()); - } - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } - } + public void start() {} @Override - public void stop() { - try { - process.destroy(); - process.waitFor(2, TimeUnit.SECONDS); - boolean attemptForceStopContainer = false; - try { - int processExitValue = process.exitValue(); - if (processExitValue != 0) { - attemptForceStopContainer = true; - } - System.out.println("processExitValue = " + processExitValue); - LOGGER.warning("Container exit value = " + processExitValue); - } catch (IllegalThreadStateException e) { - attemptForceStopContainer = true; - } - - if (attemptForceStopContainer) { - LOGGER.warning("Container did not gracefully exit, attempting to explicitly stop it."); - System.out.println("Container did not gracefully exit, attempting to explicitly stop it."); - ImmutableList command = ImmutableList.of("docker", "kill", containerName); - System.out.println("command = " + command); - LOGGER.log(Level.WARNING, command.toString()); - Process shutdownProcess = new ProcessBuilder(command).start(); - shutdownProcess.waitFor(5, TimeUnit.SECONDS); - int shutdownProcessExitValue = shutdownProcess.exitValue(); - LOGGER.warning("Container exit value = " + shutdownProcessExitValue); - } - - // wait for the server to shutdown - runWithRetries( - () -> { - try { - listRetryTests(); - } catch (SocketException e) { - // desired result - return null; - } - throw new NotShutdownException(); - }, - RetrySettings.newBuilder() - .setTotalTimeout(Duration.ofSeconds(30)) - .setInitialRetryDelay(Duration.ofMillis(500)) - .setRetryDelayMultiplier(1.5) - .setMaxRetryDelay(Duration.ofSeconds(5)) - .build(), - new BasicResultRetryAlgorithm>() { - @Override - public boolean shouldRetry(Throwable previousThrowable, List previousResponse) { - return previousThrowable instanceof NotShutdownException; - } - }, - NanoClock.getDefaultClock()); - try { - Files.delete(errPath); - Files.delete(outPath); - Files.delete(tempDirectory); - } catch (IOException e) { - throw new RuntimeException(e); - } - } catch (InterruptedException | IOException e) { - throw new RuntimeException(e); - } - } + public void stop() {} private void dumpServerLogs(Path outFile, Path errFile) throws IOException { try { From b3eb720edbd5c82224c4759bb05d4ffd2796aa1b Mon Sep 17 00:00:00 2001 From: Sydney Munro Date: Tue, 19 Nov 2024 09:06:19 -0800 Subject: [PATCH 2/2] make exception handling more generic --- .../src/main/java/com/google/cloud/storage/StorageImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index 96a14b80c8..fbb93d1b22 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -288,10 +288,10 @@ public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOp object = query.getObject(); } return codecs.blobInfo().decode(object).asBlob(this); - } catch (IOException e) { + } catch (Exception e) { otelSpan.recordException(e); otelSpan.setStatus(StatusCode.ERROR, e.getClass().getSimpleName()); - throw e; + throw StorageException.coalesce(e); } finally { otelSpan.end(); }