Skip to content

Commit

Permalink
[remote] upload: treat ALREADY_EXISTS as success (#17732)
Browse files Browse the repository at this point in the history
If the service returns an `ALREADY_EXISTS` error, then we assume that the proper file is present remotely.

Prior art: #12112

Fixes #12111

Closes #17692.

PiperOrigin-RevId: 515339566
Change-Id: Iafdd288148e47197cc047d39c9a5e5b6c95acee1

Co-authored-by: Yannic Bonenberger <[email protected]>
  • Loading branch information
ShreeM01 and Yannic authored Mar 13, 2023
1 parent e362509 commit 718bea2
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ascii;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.AsyncCallable;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -389,7 +390,24 @@ private ListenableFuture<Long> upload(long pos) {
channel -> {
SettableFuture<Long> uploadResult = SettableFuture.create();
bsAsyncStub(channel).write(new Writer(resourceName, chunker, pos, uploadResult));
return uploadResult;
return Futures.catchingAsync(
uploadResult,
Throwable.class,
throwable -> {
Preconditions.checkNotNull(throwable);

Status status = Status.fromThrowable(throwable);
switch (status.getCode()) {
case ALREADY_EXISTS:
// Server indicated the blob already exists, so we translate the error to a
// successful upload.
return Futures.immediateFuture(chunker.getSize());

default:
return Futures.immediateFailedFuture(throwable);
}
},
MoreExecutors.directExecutor());
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,9 +750,12 @@ public void testUploadCacheMissesWithRetries() throws Exception {
fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar"), "x");
final Digest bazDigest =
fakeFileCache.createScratchInput(ActionInputHelper.fromPath("baz"), "z");
final Digest foobarDigest =
fakeFileCache.createScratchInput(ActionInputHelper.fromPath("foobar"), "foobar");
final Path fooFile = execRoot.getRelative("a/foo");
final Path barFile = execRoot.getRelative("bar");
final Path bazFile = execRoot.getRelative("baz");
final Path foobarFile = execRoot.getRelative("foobar");
ActionKey actionKey = DIGEST_UTIL.asActionKey(fooDigest); // Could be any key.
barFile.setExecutable(true);
serviceRegistry.addService(
Expand All @@ -770,6 +773,7 @@ public void findMissingBlobs(
.addMissingBlobDigests(fooDigest)
.addMissingBlobDigests(barDigest)
.addMissingBlobDigests(bazDigest)
.addMissingBlobDigests(foobarDigest)
.build());
responseObserver.onCompleted();
} else {
Expand All @@ -783,6 +787,7 @@ public void findMissingBlobs(
rb.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest).setIsExecutable(true);
rb.addOutputFilesBuilder().setPath("bar").setDigest(barDigest).setIsExecutable(true);
rb.addOutputFilesBuilder().setPath("baz").setDigest(bazDigest).setIsExecutable(true);
rb.addOutputFilesBuilder().setPath("foobar").setDigest(foobarDigest).setIsExecutable(true);
ActionResult result = rb.build();
serviceRegistry.addService(
new ActionCacheImplBase() {
Expand Down Expand Up @@ -838,6 +843,9 @@ public void onNext(WriteRequest request) {
} else if (resourceName.contains(bazDigest.getHash())) {
assertThat(dataStr).isEqualTo("z");
size = 1;
} else if (resourceName.contains(foobarDigest.getHash())) {
responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
return;
} else {
fail("Unexpected resource name in upload: " + resourceName);
}
Expand Down Expand Up @@ -875,9 +883,9 @@ public void onError(Throwable t) {
actionKey,
Action.getDefaultInstance(),
Command.getDefaultInstance(),
ImmutableList.<Path>of(fooFile, barFile, bazFile));
// 4 times for the errors, 3 times for the successful uploads.
Mockito.verify(mockByteStreamImpl, Mockito.times(7))
ImmutableList.<Path>of(fooFile, barFile, bazFile, foobarFile));
// 4 times for the errors, 4 times for the successful uploads.
Mockito.verify(mockByteStreamImpl, Mockito.times(8))
.write(ArgumentMatchers.<StreamObserver<WriteResponse>>any());
}

Expand Down

0 comments on commit 718bea2

Please sign in to comment.