Skip to content

Commit

Permalink
fix: update implementation of readAllBytes and downloadTo to be more …
Browse files Browse the repository at this point in the history
…robust to retryable errors (#2305)

Additional small changes to bring http and grpc implementation into conformance with each other.

Much of this also serves as pre-work to the grpc retry conformance tests enablement after the next release of testbench.
  • Loading branch information
BenWhitehead authored Nov 17, 2023
1 parent 29f4ea6 commit 21821da
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,10 @@ public final synchronized int write(ByteBuffer src) throws IOException {
}
int write = tmp.write(src);
return write;
} catch (StorageException e) {
throw new IOException(e);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(StorageException.coalesce(e));
throw StorageException.coalesce(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,34 @@ public Blob create(

@Override
public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) {
try {
return createFrom(blobInfo, content, options);
} catch (IOException e) {
requireNonNull(blobInfo, "blobInfo must be non null");

Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);

UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
.byteChannel(
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext))
.setHasher(Hasher.enabled())
.setByteStringStrategy(ByteStringStrategy.noCopy())
.direct()
.unbuffered()
.setRequest(req)
.build();

// Specifically not in the try-with, so we don't close the provided stream
ReadableByteChannel src =
Channels.newChannel(firstNonNull(content, new ByteArrayInputStream(ZERO_BYTES)));
try (UnbufferedWritableByteChannel dst = session.open()) {
ByteStreams.copy(src, dst);
} catch (Exception e) {
throw StorageException.coalesce(e);
}
return getBlob(session.getResult());
}

@Override
Expand Down Expand Up @@ -309,7 +332,7 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOpt> o
}
return codecs.blobInfo().decode(object).asBlob(this);
} catch (InterruptedException | ExecutionException e) {
throw StorageException.coalesce(e);
throw StorageException.coalesce(e.getCause());
}
}

Expand Down Expand Up @@ -359,7 +382,14 @@ public Blob createFrom(
@Override
public Bucket get(String bucket, BucketGetOption... options) {
Opts<BucketSourceOpt> unwrap = Opts.unwrap(options);
return internalBucketGet(bucket, unwrap);
try {
return internalBucketGet(bucket, unwrap);
} catch (StorageException e) {
if (e.getCode() == 404) {
return null;
}
throw e;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.Executors.callable;

import com.google.api.core.ApiFuture;
import com.google.api.gax.paging.Page;
Expand All @@ -37,12 +36,14 @@
import com.google.cloud.Policy;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Entity;
import com.google.cloud.storage.ApiaryUnbufferedReadableByteChannel.ApiaryReadRequest;
import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelContext;
import com.google.cloud.storage.HmacKey.HmacKeyMetadata;
import com.google.cloud.storage.PostPolicyV4.ConditionV4Type;
import com.google.cloud.storage.PostPolicyV4.PostConditionsV4;
import com.google.cloud.storage.PostPolicyV4.PostFieldsV4;
import com.google.cloud.storage.PostPolicyV4.PostPolicyV4Document;
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
Expand All @@ -59,9 +60,10 @@
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Ints;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -73,6 +75,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -604,9 +607,25 @@ public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) {
Opts<ObjectSourceOpt> unwrap = Opts.unwrap(options);
Opts<ObjectSourceOpt> resolve = unwrap.resolveFrom(blob);
ImmutableMap<StorageRpc.Option, ?> optionsMap = resolve.getRpcOptions();
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForObjectsGet(storageObject, optionsMap);
return run(algorithm, () -> storageRpc.load(storageObject, optionsMap), Function.identity());
boolean autoGzipDecompression =
Utils.isAutoGzipDecompression(resolve, /*defaultWhenUndefined=*/ true);
UnbufferedReadableByteChannelSession<StorageObject> session =
ResumableMedia.http()
.read()
.byteChannel(BlobReadChannelContext.from(this))
.setAutoGzipDecompression(autoGzipDecompression)
.unbuffered()
.setApiaryReadRequest(
new ApiaryReadRequest(storageObject, optionsMap, ByteRangeSpec.nullRange()))
.build();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (UnbufferedReadableByteChannel r = session.open();
WritableByteChannel w = Channels.newChannel(baos)) {
ByteStreams.copy(r, w);
} catch (IOException e) {
throw StorageException.translate(e);
}
return baos.toByteArray();
}

@Override
Expand Down Expand Up @@ -638,19 +657,26 @@ public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {

@Override
public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) {
final CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream);
final StorageObject pb = codecs.blobId().encode(blob);
ImmutableMap<StorageRpc.Option, ?> optionsMap =
Opts.unwrap(options).resolveFrom(blob).getRpcOptions();
ResultRetryAlgorithm<?> algorithm = retryAlgorithmManager.getForObjectsGet(pb, optionsMap);
run(
algorithm,
callable(
() -> {
storageRpc.read(
pb, optionsMap, countingOutputStream.getCount(), countingOutputStream);
}),
Function.identity());
Opts<ObjectSourceOpt> resolve = Opts.unwrap(options).resolveFrom(blob);
ImmutableMap<StorageRpc.Option, ?> optionsMap = resolve.getRpcOptions();
boolean autoGzipDecompression =
Utils.isAutoGzipDecompression(resolve, /*defaultWhenUndefined=*/ true);
UnbufferedReadableByteChannelSession<StorageObject> session =
ResumableMedia.http()
.read()
.byteChannel(BlobReadChannelContext.from(this))
.setAutoGzipDecompression(autoGzipDecompression)
.unbuffered()
.setApiaryReadRequest(new ApiaryReadRequest(pb, optionsMap, ByteRangeSpec.nullRange()))
.build();
// don't close the provided stream
WritableByteChannel w = Channels.newChannel(outputStream);
try (UnbufferedReadableByteChannel r = session.open()) {
ByteStreams.copy(r, w);
} catch (IOException e) {
throw StorageException.translate(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.storage.ByteSizeConstants._2MiB;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;

import com.google.api.core.ApiFuture;
import com.google.api.gax.grpc.GrpcCallContext;
Expand Down Expand Up @@ -81,16 +82,19 @@ public void create_bytes() throws Exception {

@Test
public void create_inputStream() throws Exception {
Resumable.FakeService service = Resumable.FakeService.create();
Direct.FakeService service = Direct.FakeService.create();
try (TmpFile tmpFile = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize);
FakeServer server = FakeServer.of(service);
Storage s = server.getGrpcStorageOptions().getService();
InputStream in = Channels.newInputStream(tmpFile.reader())) {
BlobInfo info = BlobInfo.newBuilder("buck", "obj").build();
s.create(info, in, BlobWriteOption.doesNotExist());
// create uses a direct upload, once the stream is consumed there is no means for us to retry
// if an error happens it should be surfaced
StorageException se =
assertThrows(
StorageException.class, () -> s.create(info, in, BlobWriteOption.doesNotExist()));
assertThat(se.getCode()).isEqualTo(500);
}

assertThat(service.returnError.get()).isFalse();
}

@Test
Expand Down
Loading

0 comments on commit 21821da

Please sign in to comment.