Skip to content

Commit

Permalink
Report digest of failed uploads
Browse files Browse the repository at this point in the history
This will make the error message more useful because otherwise, there
was no way of telling which upload failed or timed out (other than
running with `-j 1`)
  • Loading branch information
Yannic committed Nov 18, 2020
1 parent 59f19eb commit 463954f
Showing 1 changed file with 31 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.SECONDS;

import build.bazel.remote.execution.v2.Digest;
import com.google.bytestream.ByteStreamGrpc;
import com.google.bytestream.ByteStreamGrpc.ByteStreamFutureStub;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
Expand Down Expand Up @@ -83,10 +84,10 @@ class ByteStreamUploader extends AbstractReferenceCounted {

/** Contains the hash codes of already uploaded blobs. **/
@GuardedBy("lock")
private final Set<HashCode> uploadedBlobs = new HashSet<>();
private final Set<Digest> uploadedBlobs = new HashSet<>();

@GuardedBy("lock")
private final Map<HashCode, ListenableFuture<Void>> uploadsInProgress = new HashMap<>();
private final Map<Digest, ListenableFuture<Void>> uploadsInProgress = new HashMap<>();

@GuardedBy("lock")
private boolean isShutdown;
Expand Down Expand Up @@ -199,6 +200,14 @@ void shutdown() {
}
}

@Deprecated
public ListenableFuture<Void> uploadBlobAsync(
HashCode hash, Chunker chunker, boolean forceUpload) {
Digest digest =
Digest.newBuilder().setHash(hash.toString()).setSizeBytes(chunker.getSize()).build();
return uploadBlobAsync(digest, chunker, forceUpload);
}

/**
* Uploads a BLOB asynchronously to the remote {@code ByteStream} service. The call returns
* immediately and one can listen to the returned future for the success/failure of the upload.
Expand All @@ -209,32 +218,32 @@ void shutdown() {
* <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being
* performed. This is transparent to the user of this API.
*
* @param hash the hash of the data to upload.
* @param digest the {@link Digest} of the data to upload.
* @param chunker the data to upload.
* @param forceUpload if {@code false} the blob is not uploaded if it has previously been
* uploaded, if {@code true} the blob is uploaded.
* @throws IOException when reading of the {@link Chunker}s input source fails
*/
public ListenableFuture<Void> uploadBlobAsync(
HashCode hash, Chunker chunker, boolean forceUpload) {
Digest digest, Chunker chunker, boolean forceUpload) {
synchronized (lock) {
checkState(!isShutdown, "Must not call uploadBlobs after shutdown.");

if (!forceUpload && uploadedBlobs.contains(hash)) {
if (!forceUpload && uploadedBlobs.contains(digest)) {
return Futures.immediateFuture(null);
}

ListenableFuture<Void> inProgress = uploadsInProgress.get(hash);
ListenableFuture<Void> inProgress = uploadsInProgress.get(digest);
if (inProgress != null) {
return inProgress;
}

ListenableFuture<Void> uploadResult =
Futures.transform(
startAsyncUpload(hash, chunker),
startAsyncUpload(digest, chunker),
(v) -> {
synchronized (lock) {
uploadedBlobs.add(hash);
uploadedBlobs.add(digest);
}
return null;
},
Expand All @@ -244,14 +253,20 @@ public ListenableFuture<Void> uploadBlobAsync(
Futures.catchingAsync(
uploadResult,
StatusRuntimeException.class,
(sre) -> Futures.immediateFailedFuture(new IOException(sre)),
(sre) -> Futures.immediateFailedFuture(
new IOException(
String.format(
"Error while uploading artifact with digest '%s/%s'",
digest.getHash(),
digest.getSizeBytes()),
sre)),
MoreExecutors.directExecutor());

uploadsInProgress.put(hash, uploadResult);
uploadsInProgress.put(digest, uploadResult);
uploadResult.addListener(
() -> {
synchronized (lock) {
uploadsInProgress.remove(hash);
uploadsInProgress.remove(digest);
}
},
MoreExecutors.directExecutor());
Expand All @@ -268,24 +283,25 @@ boolean uploadsInProgress() {
}

private static String buildUploadResourceName(
String instanceName, UUID uuid, HashCode hash, long size) {
String resourceName = format("uploads/%s/blobs/%s/%d", uuid, hash, size);
String instanceName, UUID uuid, Digest digest) {
String resourceName =
format("uploads/%s/blobs/%s/%d", uuid, digest.getHash(), digest.getSizeBytes());
if (!Strings.isNullOrEmpty(instanceName)) {
resourceName = instanceName + "/" + resourceName;
}
return resourceName;
}

/** Starts a file upload and returns a future representing the upload. */
private ListenableFuture<Void> startAsyncUpload(HashCode hash, Chunker chunker) {
private ListenableFuture<Void> startAsyncUpload(Digest digest, Chunker chunker) {
try {
chunker.reset();
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}

UUID uploadId = UUID.randomUUID();
String resourceName = buildUploadResourceName(instanceName, uploadId, hash, chunker.getSize());
String resourceName = buildUploadResourceName(instanceName, uploadId, digest);
AsyncUpload newUpload =
new AsyncUpload(
channel, callCredentialsProvider, callTimeoutSecs, retrier, resourceName, chunker);
Expand Down

0 comments on commit 463954f

Please sign in to comment.