Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update request handling of gRPC based CopyWriter #2858

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ final class GapicCopyWriter extends CopyWriter {
private final GrpcStorageOptions options;
private final UnaryCallable<RewriteObjectRequest, RewriteResponse> callable;
private final ResultRetryAlgorithm<?> alg;
private final RewriteObjectRequest originalRequest;
private final RewriteResponse initialResponse;

private RewriteResponse mostRecentResponse;
Expand All @@ -39,13 +40,15 @@ final class GapicCopyWriter extends CopyWriter {
GrpcStorageImpl storage,
UnaryCallable<RewriteObjectRequest, RewriteResponse> callable,
ResultRetryAlgorithm<?> alg,
RewriteObjectRequest originalRequest,
RewriteResponse initialResponse) {
this.storage = storage;
this.options = storage.getOptions();
this.callable = callable;
this.alg = alg;
this.initialResponse = initialResponse;
this.mostRecentResponse = initialResponse;
this.originalRequest = originalRequest;
}

@Override
Expand Down Expand Up @@ -76,9 +79,7 @@ public long getTotalBytesCopied() {
public void copyChunk() {
if (!isDone()) {
RewriteObjectRequest req =
RewriteObjectRequest.newBuilder()
.setRewriteToken(mostRecentResponse.getRewriteToken())
.build();
originalRequest.toBuilder().setRewriteToken(mostRecentResponse.getRewriteToken()).build();
GrpcCallContext retryContext = Retrying.newCallContext();
mostRecentResponse =
Retrying.run(options, alg, () -> callable.call(req, retryContext), Decoder.identity());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,17 +656,24 @@ public CopyWriter copy(CopyRequest copyRequest) {
RewriteObjectRequest.newBuilder()
.setDestinationName(dstProto.getName())
.setDestinationBucket(dstProto.getBucket())
// destination_kms_key comes from dstOpts
// according to the docs in the protos, it is illegal to populate the following fields,
// clear them out if they are set
// destination_predefined_acl comes from dstOpts
// if_*_match come from srcOpts and dstOpts
// copy_source_encryption_* come from srcOpts
// common_object_request_params come from dstOpts
.setDestination(dstProto.toBuilder().clearName().clearBucket().clearKmsKey().build())
.setSourceBucket(srcProto.getBucket())
.setSourceObject(srcProto.getName());

// according to the docs in the protos, it is illegal to populate the following fields,
// clear them out if they are set
// * destination_kms_key comes from dstOpts
// * destination_predefined_acl comes from dstOpts
// * if_*_match come from srcOpts and dstOpts
// * copy_source_encryption_* come from srcOpts
// * common_object_request_params come from dstOpts
Object cleanedDst = dstProto.toBuilder().clearName().clearBucket().clearKmsKey().build();
// only set the destination if it is not equal to the default instance
// otherwise we will clobber default values populated in the gcs server side for the object
// metadata
if (!cleanedDst.equals(Object.getDefaultInstance())) {
b.setDestination(cleanedDst);
}

if (src.getGeneration() != null) {
b.setSourceGeneration(src.getGeneration());
}
Expand All @@ -685,7 +692,8 @@ public CopyWriter copy(CopyRequest copyRequest) {
getOptions(),
retryAlgorithmManager.getFor(req),
() -> callable.call(req, retryContext),
(resp) -> new GapicCopyWriter(this, callable, retryAlgorithmManager.idempotent(), resp));
(resp) ->
new GapicCopyWriter(this, callable, retryAlgorithmManager.idempotent(), req, resp));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.CopyWriter;
import com.google.cloud.storage.DataGenerator;
import com.google.cloud.storage.PackagePrivateMethodWorkarounds;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobField;
Expand Down Expand Up @@ -562,7 +563,6 @@ public void testListBlobsCurrentDirectoryIncludesBothObjectsAndSyntheticDirector
}

@Test
// When gRPC support is added for matchGlob, enable this test for gRPC.
public void testListBlobsWithMatchGlob() throws Exception {
BucketInfo bucketInfo = BucketInfo.newBuilder(generator.randomBucketName()).build();
try (TemporaryBucket tempBucket =
Expand Down Expand Up @@ -848,8 +848,6 @@ public void testComposeBlobFail() {
}

@Test
// Bucket attribute extration on allowlist bug b/246634709
@Exclude(transports = Transport.GRPC)
public void testCopyBlob() {

String sourceBlobName = generator.randomObjectName() + "-source";
Expand All @@ -872,8 +870,35 @@ public void testCopyBlob() {
}

@Test
// Bucket attribute extration on allowlist bug b/246634709
@Exclude(transports = Transport.GRPC)
public void copyBlob_classChange_multipleChunks() {

String sourceBlobName = generator.randomObjectName() + "-source";
BlobId source = BlobId.of(bucket.getName(), sourceBlobName);
ImmutableMap<String, String> metadata = ImmutableMap.of("k", "v");
BlobInfo blob = BlobInfo.newBuilder(source).setMetadata(metadata).build();
int _5MiB = 5 * 1024 * 1024;
byte[] bytes = DataGenerator.base64Characters().genBytes(_5MiB);
Blob remoteBlob = storage.create(blob, bytes);
assertThat(remoteBlob).isNotNull();
String targetBlobName = generator.randomObjectName() + "-target";
CopyRequest req =
CopyRequest.newBuilder()
.setSource(source)
.setTarget(
BlobInfo.newBuilder(bucket, targetBlobName)
// change the storage class to force GCS to copy bytes
.setStorageClass(StorageClass.NEARLINE)
.build(),
BlobTargetOption.doesNotExist())
.setMegabytesCopiedPerChunk(2L)
.build();
CopyWriter copyWriter = storage.copy(req);
BlobInfo remoteBlob2 = copyWriter.getResult();
assertThat(copyWriter.isDone()).isTrue();
assertThat(remoteBlob2).isNotNull();
}

@Test
public void testCopyBlobWithPredefinedAcl() {

String sourceBlobName = generator.randomObjectName() + "-source";
Expand Down Expand Up @@ -903,8 +928,6 @@ public void testCopyBlobWithPredefinedAcl() {
}

@Test
// Bucket attribute extration on allowlist bug b/246634709
@Exclude(transports = Transport.GRPC)
public void testCopyBlobWithEncryptionKeys() {

String sourceBlobName = generator.randomObjectName() + "-source";
Expand Down Expand Up @@ -955,8 +978,6 @@ public void testCopyBlobWithEncryptionKeys() {
}

@Test
// Bucket attribute extration on allowlist bug b/246634709
@Exclude(transports = Transport.GRPC)
public void testCopyBlobUpdateMetadata() {

String sourceBlobName = generator.randomObjectName() + "-source";
Expand All @@ -981,9 +1002,7 @@ public void testCopyBlobUpdateMetadata() {
assertTrue(storage.delete(bucket.getName(), targetBlobName));
}

// Re-enable this test when it stops failing
// @Test
@Exclude(transports = Transport.GRPC)
@Test
public void testCopyBlobUpdateStorageClass() {
String sourceBlobName = generator.randomObjectName() + "-source";
BlobId source = BlobId.of(bucket.getName(), sourceBlobName);
Expand All @@ -1007,8 +1026,6 @@ public void testCopyBlobUpdateStorageClass() {
}

@Test
// Bucket attribute extration on allowlist bug b/246634709
@Exclude(transports = Transport.GRPC)
public void testCopyBlobNoContentType() {

String sourceBlobName = generator.randomObjectName() + "-source";
Expand All @@ -1022,17 +1039,16 @@ public void testCopyBlobNoContentType() {
CopyWriter copyWriter = storage.copy(req);
assertEquals(bucket.getName(), copyWriter.getResult().getBucket());
assertEquals(targetBlobName, copyWriter.getResult().getName());
assertNull(copyWriter.getResult().getContentType());
assertTrue(
copyWriter.getResult().getContentType() == null
|| copyWriter.getResult().getContentType().isEmpty());
assertEquals(metadata, copyWriter.getResult().getMetadata());
assertTrue(copyWriter.isDone());
assertTrue(remoteSourceBlob.delete());
assertTrue(storage.delete(bucket.getName(), targetBlobName));
}

@Test
// Verified against testbench
// Bucket attribute extration on allowlist bug b/246634709
@Exclude(transports = Transport.GRPC)
public void testCopyBlobFail() {

String sourceBlobName = "test-copy-blob-source-fail";
Expand Down
Loading