Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow specifying an expected object size for resumable operations. #2661

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
BidiWriteObjectRequest req = grpc.getBidiWriteObjectRequest(info, opts);

ApiFuture<BidiResumableWrite> startResumableWrite =
grpc.startResumableWrite(grpcCallContext, req);
grpc.startResumableWrite(grpcCallContext, req, opts);
return ResumableMedia.gapic()
.write()
.bidiByteChannel(grpc.storageClient.bidiWriteObjectCallable())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
WriteObjectRequest req = grpc.getWriteObjectRequest(info, opts);

ApiFuture<ResumableWrite> startResumableWrite =
grpc.startResumableWrite(grpcCallContext, req);
grpc.startResumableWrite(grpcCallContext, req, opts);
return ResumableMedia.gapic()
.write()
.byteChannel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.BidiStreamingCallable;
import com.google.api.gax.rpc.ErrorDetails;
import com.google.api.gax.rpc.OutOfRangeException;
import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
import com.google.cloud.storage.Conversions.Decoder;
Expand Down Expand Up @@ -345,10 +346,17 @@ public void onNext(BidiWriteObjectResponse value) {
public void onError(Throwable t) {
if (t instanceof OutOfRangeException) {
OutOfRangeException oore = (OutOfRangeException) t;
clientDetectedError(
ResumableSessionFailureScenario.SCENARIO_5.toStorageException(
ImmutableList.of(lastWrittenRequest), null, context, oore));
} else if (t instanceof ApiException) {
ErrorDetails ed = oore.getErrorDetails();
if (!(ed != null
&& ed.getErrorInfo() != null
&& ed.getErrorInfo().getReason().equals("GRPC_MISMATCHED_UPLOAD_SIZE"))) {
clientDetectedError(
ResumableSessionFailureScenario.SCENARIO_5.toStorageException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get some comments around what Scenario 5 is to make it easier to read?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see this was there before as well. my apologies. that can be a followup.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResumableSessionFailureScenario is an enum that is shared by several of our upload implementations and defines the types of failure scenarios that can happen when processing an upload request.

SCENARIO_5 is client side data loss detected

SCENARIO_5(
BaseServiceException.UNKNOWN_CODE,
"dataLoss",
"Client side data loss detected. Attempt to append to a resumable session with an offset higher than the backend has"),

ImmutableList.of(lastWrittenRequest), null, context, oore));
return;
}
}
if (t instanceof ApiException) {
// use StorageExceptions logic to translate from ApiException to our status codes ensuring
// things fall in line with our retry handlers.
// This is suboptimal, as it will initialize a second exception, however this is the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.ErrorDetails;
import com.google.api.gax.rpc.OutOfRangeException;
import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
import com.google.cloud.storage.Conversions.Decoder;
Expand Down Expand Up @@ -267,11 +268,18 @@ public void onError(Throwable t) {
if (t instanceof OutOfRangeException) {
OutOfRangeException oore = (OutOfRangeException) t;
open = false;
StorageException storageException =
ResumableSessionFailureScenario.SCENARIO_5.toStorageException(
segments, null, context, oore);
invocationHandle.setException(storageException);
} else if (t instanceof ApiException) {
ErrorDetails ed = oore.getErrorDetails();
if (!(ed != null
&& ed.getErrorInfo() != null
&& ed.getErrorInfo().getReason().equals("GRPC_MISMATCHED_UPLOAD_SIZE"))) {
StorageException storageException =
ResumableSessionFailureScenario.SCENARIO_5.toStorageException(
segments, null, context, oore);
invocationHandle.setException(storageException);
return;
}
}
if (t instanceof ApiException) {
// use StorageExceptions logic to translate from ApiException to our status codes ensuring
// things fall in line with our retry handlers.
// This is suboptimal, as it will initialize a second exception, however this is the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.api.gax.rpc.BidiStreamingCallable;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.storage.v2.BidiWriteObjectRequest;
import com.google.storage.v2.BidiWriteObjectResponse;
Expand Down Expand Up @@ -50,7 +52,8 @@ GapicBidiWritableByteChannelSessionBuilder bidiByteChannel(

ApiFuture<ResumableWrite> resumableWrite(
UnaryCallable<StartResumableWriteRequest, StartResumableWriteResponse> callable,
WriteObjectRequest writeObjectRequest) {
WriteObjectRequest writeObjectRequest,
Opts<ObjectTargetOpt> opts) {
StartResumableWriteRequest.Builder b = StartResumableWriteRequest.newBuilder();
if (writeObjectRequest.hasWriteObjectSpec()) {
b.setWriteObjectSpec(writeObjectRequest.getWriteObjectSpec());
Expand All @@ -61,7 +64,7 @@ ApiFuture<ResumableWrite> resumableWrite(
if (writeObjectRequest.hasObjectChecksums()) {
b.setObjectChecksums(writeObjectRequest.getObjectChecksums());
}
StartResumableWriteRequest req = b.build();
StartResumableWriteRequest req = opts.startResumableWriteRequest().apply(b).build();
Function<String, WriteObjectRequest> f =
uploadId ->
writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build();
Expand All @@ -80,7 +83,8 @@ ApiFuture<ResumableWrite> resumableWrite(

ApiFuture<BidiResumableWrite> bidiResumableWrite(
UnaryCallable<StartResumableWriteRequest, StartResumableWriteResponse> x,
BidiWriteObjectRequest writeObjectRequest) {
BidiWriteObjectRequest writeObjectRequest,
Opts<ObjectTargetOpt> opts) {
StartResumableWriteRequest.Builder b = StartResumableWriteRequest.newBuilder();
if (writeObjectRequest.hasWriteObjectSpec()) {
b.setWriteObjectSpec(writeObjectRequest.getWriteObjectSpec());
Expand All @@ -91,7 +95,7 @@ ApiFuture<BidiResumableWrite> bidiResumableWrite(
if (writeObjectRequest.hasObjectChecksums()) {
b.setObjectChecksums(writeObjectRequest.getObjectChecksums());
}
StartResumableWriteRequest req = b.build();
StartResumableWriteRequest req = opts.startResumableWriteRequest().apply(b).build();
Function<String, BidiWriteObjectRequest> f =
uploadId ->
writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOpt> o
ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write =
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext);

ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req);
ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req, opts);
ApiFuture<GrpcResumableSession> session2 =
ApiFutures.transform(
start,
Expand Down Expand Up @@ -365,7 +365,7 @@ public Blob createFrom(
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);

ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req);
ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req, opts);

BufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
Expand Down Expand Up @@ -790,7 +790,7 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options
// in JSON, the starting of the resumable session happens before the invocation of write can
// happen. Emulate the same thing here.
// 1. create the future
ApiFuture<ResumableWrite> startResumableWrite = startResumableWrite(grpcCallContext, req);
ApiFuture<ResumableWrite> startResumableWrite = startResumableWrite(grpcCallContext, req, opts);
// 2. await the result of the future
ResumableWrite resumableWrite = ApiFutureUtils.await(startResumableWrite);
// 3. wrap the result in another future container before constructing the BlobWriteChannel
Expand Down Expand Up @@ -1919,7 +1919,7 @@ private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(

@VisibleForTesting
ApiFuture<ResumableWrite> startResumableWrite(
GrpcCallContext grpcCallContext, WriteObjectRequest req) {
GrpcCallContext grpcCallContext, WriteObjectRequest req, Opts<ObjectTargetOpt> opts) {
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(req));
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
return ResumableMedia.gapic()
Expand All @@ -1928,11 +1928,12 @@ ApiFuture<ResumableWrite> startResumableWrite(
storageClient
.startResumableWriteCallable()
.withDefaultCallContext(merge.withRetryableCodes(codes)),
req);
req,
opts);
}

ApiFuture<BidiResumableWrite> startResumableWrite(
GrpcCallContext grpcCallContext, BidiWriteObjectRequest req) {
GrpcCallContext grpcCallContext, BidiWriteObjectRequest req, Opts<ObjectTargetOpt> opts) {
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(req));
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
return ResumableMedia.gapic()
Expand All @@ -1941,7 +1942,8 @@ ApiFuture<BidiResumableWrite> startResumableWrite(
storageClient
.startResumableWriteCallable()
.withDefaultCallContext(merge.withRetryableCodes(codes)),
req);
req,
opts);
}

private SourceObject sourceObjectEncode(SourceBlob from) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
ApiFuture<ResumableWrite> f =
grpcStorage.startResumableWrite(
grpcCallContext, grpcStorage.getWriteObjectRequest(info, opts));
grpcCallContext, grpcStorage.getWriteObjectRequest(info, opts), opts);
ApiFuture<WriteCtx<ResumableWrite>> start =
ApiFutures.transform(f, WriteCtx::new, MoreExecutors.directExecutor());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ public void rewindTo(long offset) {
&& contentLength != null
&& contentLength > 0) {
String errorMessage = cause.getContent().toLowerCase(Locale.US);
if (errorMessage.contains("content-range")) {
if (errorMessage.contains("content-range")
&& !errorMessage.contains("earlier")) { // TODO: exclude "earlier request"
StorageException se =
ResumableSessionFailureScenario.SCENARIO_5.toStorageException(
uploadId, response, cause, cause::getContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.cloud.storage.UnifiedOpts.ResumableUploadExpectedObjectSize;
import com.google.cloud.storage.UnifiedOpts.SourceGenerationMatch;
import com.google.cloud.storage.UnifiedOpts.SourceGenerationNotMatch;
import com.google.cloud.storage.UnifiedOpts.SourceMetagenerationMatch;
Expand Down Expand Up @@ -102,7 +103,8 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab
|| o instanceof SourceMetagenerationMatch
|| o instanceof SourceMetagenerationNotMatch
|| o instanceof Crc32cMatch
|| o instanceof Md5Match;
|| o instanceof Md5Match
|| o instanceof ResumableUploadExpectedObjectSize;
TO_EXCLUDE_FROM_PARTS = tmp.negate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,23 @@ public static BlobWriteOption detectContentType() {
return new BlobWriteOption(UnifiedOpts.detectContentType());
}

/**
* Set a precondition on the number of bytes that GCS should expect for a resumable upload. See
* the docs for <a
* href="https://cloud.google.com/storage/docs/json_api/v1/parameters#xuploadcontentlength">X-Upload-Content-Length</a>
* for more detail.
*
* <p>If the method invoked with this option does not perform a resumable upload, this option
* will be ignored.
*
* @since 2.42.0
*/
@BetaApi
@TransportCompatibility({Transport.HTTP, Transport.GRPC})
public static BlobWriteOption expectedObjectSize(long objectContentSize) {
return new BlobWriteOption(UnifiedOpts.resumableUploadExpectedObjectSize(objectContentSize));
}

/**
* Deduplicate any options which are the same parameter. The value which comes last in {@code
* os} will be the value included in the return.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import com.google.storage.v2.ReadObjectRequest;
import com.google.storage.v2.RestoreObjectRequest;
import com.google.storage.v2.RewriteObjectRequest;
import com.google.storage.v2.StartResumableWriteRequest;
import com.google.storage.v2.UpdateBucketRequest;
import com.google.storage.v2.UpdateHmacKeyRequest;
import com.google.storage.v2.UpdateObjectRequest;
Expand Down Expand Up @@ -196,6 +197,10 @@ default Mapper<ComposeObjectRequest.Builder> composeObject() {
default Mapper<RewriteObjectRequest.Builder> rewriteObject() {
return Mapper.identity();
}

default Mapper<StartResumableWriteRequest.Builder> startResumableWrite() {
return Mapper.identity();
}
}

/** Base interface for those Opts which are applicable to Bucket List operations */
Expand Down Expand Up @@ -487,6 +492,12 @@ static Projection projection(@NonNull String projection) {
return new Projection(projection);
}

static ResumableUploadExpectedObjectSize resumableUploadExpectedObjectSize(
long expectedObjectSize) {
checkArgument(expectedObjectSize >= 0, "expectedObjectSize >= 0 (%s >= 0)", expectedObjectSize);
return new ResumableUploadExpectedObjectSize(expectedObjectSize);
}

static SoftDeleted softDeleted(boolean softDeleted) {
return new SoftDeleted(softDeleted);
}
Expand Down Expand Up @@ -1832,6 +1843,25 @@ public Mapper<UpdateObjectRequest.Builder> updateObject() {
}
}

static final class ResumableUploadExpectedObjectSize extends RpcOptVal<@NonNull Long>
implements ObjectTargetOpt {
private static final long serialVersionUID = 3640126281492196357L;

private ResumableUploadExpectedObjectSize(@NonNull Long val) {
super(StorageRpc.Option.X_UPLOAD_CONTENT_LENGTH, val);
}

@Override
public Mapper<StartResumableWriteRequest.Builder> startResumableWrite() {
return b -> {
if (val > 0) {
b.getWriteObjectSpecBuilder().setObjectSize(val);
}
return b;
};
}
}

static final class ShowDeletedKeys extends RpcOptVal<@NonNull Boolean> implements HmacKeyListOpt {
private static final long serialVersionUID = -6604176744362903487L;

Expand Down Expand Up @@ -2426,6 +2456,10 @@ Mapper<BidiWriteObjectRequest.Builder> bidiWriteObjectRequest() {
return fuseMappers(ObjectTargetOpt.class, ObjectTargetOpt::bidiWriteObject);
}

Mapper<StartResumableWriteRequest.Builder> startResumableWriteRequest() {
return fuseMappers(ObjectTargetOpt.class, ObjectTargetOpt::startResumableWrite);
}

Mapper<GetObjectRequest.Builder> getObjectsRequest() {
return fuseMappers(ObjectSourceOpt.class, ObjectSourceOpt::getObject);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,10 @@ public String open(StorageObject object, Map<Option, ?> options) {
requestFactory.buildPostRequest(url, new JsonHttpContent(jsonFactory, object));
HttpHeaders requestHeaders = httpRequest.getHeaders();
requestHeaders.set("X-Upload-Content-Type", detectContentType(object, options));
Long xUploadContentLength = Option.X_UPLOAD_CONTENT_LENGTH.getLong(options);
if (xUploadContentLength != null) {
requestHeaders.set("X-Upload-Content-Length", xUploadContentLength);
}
setEncryptionHeaders(requestHeaders, "x-goog-encryption-", options);
HttpResponse response = httpRequest.execute();
if (response.getStatusCode() != 200) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ enum Option {
SOFT_DELETED("softDeleted"),
COPY_SOURCE_ACL("copySourceAcl"),
GENERATION("generation"),
INCLUDE_FOLDERS_AS_PREFIXES("includeFoldersAsPrefixes");
INCLUDE_FOLDERS_AS_PREFIXES("includeFoldersAsPrefixes"),
X_UPLOAD_CONTENT_LENGTH("x-upload-content-length");

private final String value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.storage.v2.StartResumableWriteRequest;
import com.google.storage.v2.StartResumableWriteResponse;
import com.google.storage.v2.WriteObjectRequest;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void syntax_directBuffered_fluent() {
@Test
public void syntax_resumableUnbuffered_fluent() {
ApiFuture<ResumableWrite> startAsync =
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req);
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
Expand All @@ -110,7 +111,7 @@ public void syntax_resumableUnbuffered_fluent() {
@Test
public void syntax_resumableBuffered_fluent() {
ApiFuture<ResumableWrite> startAsync =
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req);
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
BufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
Expand Down Expand Up @@ -150,7 +151,7 @@ public void syntax_directBuffered_incremental() {
@Test
public void syntax_resumableUnbuffered_incremental() {
ApiFuture<ResumableWrite> startAsync =
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req);
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
GapicWritableByteChannelSessionBuilder b1 =
ResumableMedia.gapic()
.write()
Expand All @@ -164,7 +165,7 @@ public void syntax_resumableUnbuffered_incremental() {
@Test
public void syntax_resumableBuffered_incremental() {
ApiFuture<ResumableWrite> startAsync =
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req);
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
GapicWritableByteChannelSessionBuilder b1 =
ResumableMedia.gapic()
.write()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ void testUploads(@ForAll("scenario") Scenario s) throws Exception {

ApiFuture<ResumableWrite> f =
storage.startResumableWrite(
GrpcCallContext.createDefault(), storage.getWriteObjectRequest(info, Opts.empty()));
GrpcCallContext.createDefault(),
storage.getWriteObjectRequest(info, Opts.empty()),
Opts.empty());
ResumableWrite resumableWrite = ApiExceptions.callAndTranslateApiException(f);

UploadCtx uploadCtx =
Expand Down
Loading
Loading