From 69439c42a41283d25e0d29344a2817481af04dcf Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Wed, 9 Oct 2024 17:24:27 +0000 Subject: [PATCH 1/2] fix: update gRPC ReadObject retry to avoid double retry Prevent adding an unavailable error to the queue if the retrying loop is still active. Also, cleanup unused retry config for gRPC ReadObject. --- .../GapicUnbufferedReadableByteChannel.java | 21 +++++++++++++++---- .../google/cloud/storage/GrpcStorageImpl.java | 15 +++---------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index 43270a1aae..112d19c531 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -47,6 +47,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; final class GapicUnbufferedReadableByteChannel implements UnbufferedReadableByteChannel, ScatteringByteChannel { @@ -258,6 +259,11 @@ ApiFuture getResult() { private void ensureStreamOpen() { if (readObjectObserver == null) { + java.lang.Object peek = queue.peek(); + if (peek instanceof Throwable || peek == EOF_MARKER) { + // If our queue has an error or EOF, do not send another request + return; + } readObjectObserver = Retrying.run( retryingDeps, @@ -326,13 +332,15 @@ protected void onResponseImpl(ReadObjectResponse response) { @Override protected void onErrorImpl(Throwable t) { - open.setException(t); - if (!alg.shouldRetry(t, null)) { - result.setException(StorageException.coalesce(t)); - } if (t instanceof CancellationException) { cancellation.set(t); } + if (!open.isDone()) { + open.setException(t); + if (!alg.shouldRetry(t, null)) { + result.setException(StorageException.coalesce(t)); + } + } try { queue.offer(t); } catch (InterruptedException e) { @@ -369,6 +377,11 @@ public boolean nonEmpty() { return !queue.isEmpty(); } + @Nullable + public T peek() { + return queue.peek(); + } + @NonNull public T poll() throws InterruptedException { return queue.take(); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index 35ba536aef..707770d509 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -723,8 +723,7 @@ public GrpcBlobReadChannel reader(String bucket, String blob, BlobSourceOption.. public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) { Opts opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts); ReadObjectRequest request = getReadObjectRequest(blob, opts); - Set codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(request)); - GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes); + GrpcCallContext grpcCallContext = Retrying.newCallContext(); return new GrpcBlobReadChannel( storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext), @@ -1708,10 +1707,7 @@ private UnbufferedReadableByteChannelSession unbufferedReadSession( Opts opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts); ReadObjectRequest readObjectRequest = getReadObjectRequest(blob, opts); - Set codes = - resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(readObjectRequest)); - GrpcCallContext grpcCallContext = - opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes)); + GrpcCallContext grpcCallContext = opts.grpcMetadataMapper().apply(Retrying.newCallContext()); return ResumableMedia.gapic() .read() .byteChannel( @@ -1728,16 +1724,11 @@ private UnbufferedReadableByteChannelSession unbufferedReadSession( @VisibleForTesting ApiFuture startResumableWrite( GrpcCallContext grpcCallContext, WriteObjectRequest req, Opts opts) { - Set codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(req)); GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext()); return ResumableMedia.gapic() .write() .resumableWrite( - storageClient - .startResumableWriteCallable() - .withDefaultCallContext(merge.withRetryableCodes(codes)), - req, - opts); + storageClient.startResumableWriteCallable().withDefaultCallContext(merge), req, opts); } ApiFuture startResumableWrite( From 4c0951c1bfdc7df3c1a4fdc45252395fbb2b01e2 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Thu, 10 Oct 2024 15:44:34 -0400 Subject: [PATCH 2/2] test: fix tests --- .../java/com/google/cloud/storage/GrpcStorageImpl.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index 707770d509..6dff8996e9 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -1724,11 +1724,16 @@ private UnbufferedReadableByteChannelSession unbufferedReadSession( @VisibleForTesting ApiFuture startResumableWrite( GrpcCallContext grpcCallContext, WriteObjectRequest req, Opts opts) { + Set codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(req)); GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext()); return ResumableMedia.gapic() .write() .resumableWrite( - storageClient.startResumableWriteCallable().withDefaultCallContext(merge), req, opts); + storageClient + .startResumableWriteCallable() + .withDefaultCallContext(merge.withRetryableCodes(codes)), + req, + opts); } ApiFuture startResumableWrite(