Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support new-style digest functions #18731

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;

import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.DigestFunction;
import com.google.bytestream.ByteStreamGrpc;
import com.google.bytestream.ByteStreamGrpc.ByteStreamFutureStub;
import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
Expand Down Expand Up @@ -69,6 +70,7 @@ final class ByteStreamUploader {
private final CallCredentialsProvider callCredentialsProvider;
private final long callTimeoutSecs;
private final RemoteRetrier retrier;
private final DigestFunction.Value digestFunction;

@Nullable private final Semaphore openedFilePermits;

Expand All @@ -89,14 +91,16 @@ final class ByteStreamUploader {
CallCredentialsProvider callCredentialsProvider,
long callTimeoutSecs,
RemoteRetrier retrier,
int maximumOpenFiles) {
int maximumOpenFiles,
DigestFunction.Value digestFunction) {
checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0.");
this.instanceName = instanceName;
this.channel = channel;
this.callCredentialsProvider = callCredentialsProvider;
this.callTimeoutSecs = callTimeoutSecs;
this.retrier = retrier;
this.openedFilePermits = maximumOpenFiles != -1 ? new Semaphore(maximumOpenFiles) : null;
this.digestFunction = digestFunction;
}

@VisibleForTesting
Expand Down Expand Up @@ -175,11 +179,34 @@ public ListenableFuture<Void> uploadBlobAsync(
MoreExecutors.directExecutor());
}

private static String buildUploadResourceName(
private boolean isOldStyleDigestFunction() {
// Old-style digest functions (SHA256, etc) are distinguishable by the length
// of their hash alone and do not require extra specification, but newer
// digest functions (which may have the same length hashes as the older
// functions!) must be explicitly specified in the upload resource name.
return digestFunction.getNumber() <= 7;
}

private String buildUploadResourceName(
Copy link
Member

@coeuvre coeuvre Jun 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may also want to change GrpcCacheClient#getResourceName, maybe in another PR.

Copy link
Contributor Author

@tylerwilliams tylerwilliams Jun 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may also want to change GrpcCacheClient#getResourceName, maybe in another PR.

Yeah it's a bit cyclical; I'm happy to add support for it here so that download and upload are both kept in sync -- which I've done.

Adding another test for the download path will be simple once there is a DigestHashFunction for BLAKE3. Will send that in a followup after adding the hasher.

String instanceName, UUID uuid, Digest digest, boolean compressed) {
String template =
compressed ? "uploads/%s/compressed-blobs/zstd/%s/%d" : "uploads/%s/blobs/%s/%d";
String resourceName = format(template, uuid, digest.getHash(), digest.getSizeBytes());

String resourceName;

if (isOldStyleDigestFunction()) {
String template =
compressed ? "uploads/%s/compressed-blobs/zstd/%s/%d" : "uploads/%s/blobs/%s/%d";
resourceName = format(template, uuid, digest.getHash(), digest.getSizeBytes());
} else {
String template =
compressed ? "uploads/%s/compressed-blobs/zstd/%s/%s/%d" : "uploads/%s/blobs/%s/%s/%d";
resourceName =
format(
template,
uuid,
digestFunction.getValueDescriptor().getName().toLowerCase(),
digest.getHash(),
digest.getSizeBytes());
}
if (!Strings.isNullOrEmpty(instanceName)) {
resourceName = instanceName + "/" + resourceName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc;
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageFutureStub;
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.DigestFunction;
import build.bazel.remote.execution.v2.FindMissingBlobsRequest;
import build.bazel.remote.execution.v2.FindMissingBlobsResponse;
import build.bazel.remote.execution.v2.GetActionResultRequest;
Expand Down Expand Up @@ -107,7 +108,8 @@ public GrpcCacheClient(
callCredentialsProvider,
options.remoteTimeout.getSeconds(),
retrier,
options.maximumOpenFiles);
options.maximumOpenFiles,
digestUtil.getDigestFunction());
maxMissingBlobsDigestsPerMessage = computeMaxMissingBlobsDigestsPerMessage();
Preconditions.checkState(
maxMissingBlobsDigestsPerMessage > 0, "Error: gRPC message size too small.");
Expand Down Expand Up @@ -352,12 +354,24 @@ private ListenableFuture<Void> downloadBlob(
MoreExecutors.directExecutor());
}

public static String getResourceName(String instanceName, Digest digest, boolean compressed) {
private static boolean isOldStyleDigestFunction(DigestFunction.Value digestFunction) {
// Old-style digest functions (SHA256, etc) are distinguishable by the length
// of their hash alone and do not require extra specification, but newer
// digest functions (which may have the same length hashes as the older
// functions!) must be explicitly specified in the upload resource name.
return digestFunction.getNumber() <= 7;
}

public static String getResourceName(
String instanceName, Digest digest, boolean compressed, DigestFunction.Value digestFunction) {
String resourceName = "";
if (!instanceName.isEmpty()) {
resourceName += instanceName + "/";
}
resourceName += compressed ? "compressed-blobs/zstd/" : "blobs/";
if (!isOldStyleDigestFunction(digestFunction)) {
resourceName += digestFunction.getValueDescriptor().getName().toLowerCase() + "/";
}
return resourceName + DigestUtil.toString(digest);
}

Expand All @@ -369,7 +383,11 @@ private ListenableFuture<Long> requestRead(
@Nullable Supplier<Digest> digestSupplier,
Channel channel) {
String resourceName =
getResourceName(options.remoteInstanceName, digest, options.cacheCompression);
getResourceName(
options.remoteInstanceName,
digest,
options.cacheCompression,
digestUtil.getDigestFunction());
SettableFuture<Long> future = SettableFuture.create();
OutputStream out;
try {
Expand Down
Loading