Skip to content

Commit

Permalink
fix: update retry lifecycle when attempting to decompress a gzip obje…
Browse files Browse the repository at this point in the history
…ct (#2840)

If the initial response failed with a retryable error and the error should be retried, it wasn't. It would only retry for reading bytes after the initial response had been received.
  • Loading branch information
BenWhitehead authored Dec 12, 2024
1 parent 8b1bb43 commit 7dba13c
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.Storage.Objects;
import com.google.api.services.storage.Storage.Objects.Get;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.BaseServiceException;
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -84,7 +84,17 @@ class ApiaryUnbufferedReadableByteChannel implements UnbufferedReadableByteChann
this.storage = storage;
this.result = result;
this.options = options;
this.resultRetryAlgorithm = resultRetryAlgorithm;
this.resultRetryAlgorithm =
new BasicResultRetryAlgorithm<Object>() {
@Override
public boolean shouldRetry(Throwable previousThrowable, Object previousResponse) {
boolean shouldRetry = resultRetryAlgorithm.shouldRetry(previousThrowable, null);
if (previousThrowable != null && !shouldRetry) {
result.setException(previousThrowable);
}
return shouldRetry;
}
};
this.open = true;
this.returnEOF = false;
this.position = apiaryReadRequest.getByteRangeSpec().beginOffset();
Expand Down Expand Up @@ -210,17 +220,11 @@ private ScatteringByteChannel open() {
throw new StorageException(404, "Failure while trying to resume download", e);
}
}
StorageException translate = StorageException.translate(e);
result.setException(translate);
throw translate;
throw StorageException.translate(e);
} catch (IOException e) {
StorageException translate = StorageException.translate(e);
result.setException(translate);
throw translate;
throw StorageException.translate(e);
} catch (Throwable t) {
BaseServiceException coalesce = StorageException.coalesce(t);
result.setException(coalesce);
throw coalesce;
throw StorageException.coalesce(t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.api.gax.rpc.ServerStreamingCallable;
Expand Down Expand Up @@ -87,7 +88,18 @@ final class GapicUnbufferedReadableByteChannel
this.blobOffset = req.getReadOffset();
this.rclm = rclm;
this.retryingDeps = retryingDependencies;
this.alg = alg;
this.alg =
new BasicResultRetryAlgorithm<java.lang.Object>() {
@Override
public boolean shouldRetry(
Throwable previousThrowable, java.lang.Object previousResponse) {
boolean shouldRetry = alg.shouldRetry(previousThrowable, null);
if (previousThrowable != null && !shouldRetry) {
result.setException(previousThrowable);
}
return shouldRetry;
}
};
// The reasoning for 2 elements below allow for a single response and the EOF/error signal
// from onComplete or onError. Same thing com.google.api.gax.rpc.QueuingResponseObserver does.
this.queue = new SimpleBlockingQueue<>(2);
Expand Down Expand Up @@ -337,9 +349,6 @@ protected void onErrorImpl(Throwable t) {
}
if (!open.isDone()) {
open.setException(t);
if (!alg.shouldRetry(t, null)) {
result.setException(StorageException.coalesce(t));
}
}
try {
queue.offer(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1283,20 +1283,18 @@ private static void get(ArrayList<RpcMethodMapping> a) {
(ctx, c) ->
ctx.peek(
state -> {
try {
ReadChannel reader =
ctx.getStorage().reader(ctx.getState().getBlob().getBlobId());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ReadChannel reader =
ctx.getStorage().reader(ctx.getState().getBlob().getBlobId())) {
WritableByteChannel write = Channels.newChannel(baos);
ByteStreams.copy(reader, write);

assertThat(xxd(baos.toByteArray()))
.isEqualTo(xxd(c.getHelloWorldUtf8Bytes()));
} catch (IOException e) {
if (e.getCause() instanceof BaseServiceException) {
throw e.getCause();
}
}
assertThat(xxd(baos.toByteArray()))
.isEqualTo(xxd(c.getHelloWorldUtf8Bytes()));
}))
.build());
a.add(
Expand All @@ -1305,23 +1303,46 @@ private static void get(ArrayList<RpcMethodMapping> a) {
(ctx, c) ->
ctx.peek(
state -> {
try {
ReadChannel reader =
ctx.getStorage()
.reader(
ctx.getState().getBlob().getBlobId().getBucket(),
ctx.getState().getBlob().getBlobId().getName());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ReadChannel reader =
ctx.getStorage()
.reader(
ctx.getState().getBlob().getBlobId().getBucket(),
ctx.getState().getBlob().getBlobId().getName())) {
WritableByteChannel write = Channels.newChannel(baos);
ByteStreams.copy(reader, write);
} catch (IOException e) {
if (e.getCause() instanceof BaseServiceException) {
throw e.getCause();
}
}

assertThat(xxd(baos.toByteArray()))
.isEqualTo(xxd(c.getHelloWorldUtf8Bytes()));
assertThat(xxd(baos.toByteArray()))
.isEqualTo(xxd(c.getHelloWorldUtf8Bytes()));
}))
.build());
a.add(
RpcMethodMapping.newBuilder(250, objects.get)
.withTest(
(ctx, c) ->
ctx.peek(
state -> {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ReadChannel reader =
ctx.getStorage()
.reader(
ctx.getState().getBlob().getBlobId(),
BlobSourceOption.shouldReturnRawInputStream(false))) {
WritableByteChannel write = Channels.newChannel(baos);
ByteStreams.copy(reader, write);
} catch (IOException e) {
if (e.getCause() instanceof BaseServiceException) {
throw e.getCause();
}
}

assertThat(xxd(baos.toByteArray()))
.isEqualTo(xxd(c.getHelloWorldUtf8Bytes()));
}))
.build());
a.add(
Expand Down

0 comments on commit 7dba13c

Please sign in to comment.