Skip to content

Commit

Permalink
registry: Introduce hash-wasm so we can store the hash state and we c…
Browse files Browse the repository at this point in the history
…an continue hashing from where we left it
  • Loading branch information
gabivlj committed Nov 27, 2024
1 parent 9511163 commit 214f714
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 10 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
},
"dependencies": {
"@cfworker/base64url": "^1.12.5",
"@taylorzane/hash-wasm": "^0.0.11",
"@tsndr/cloudflare-worker-jwt": "^2.5.1",
"itty-router": "^4.0.27",
"zod": "^3.22.4"
Expand Down
8 changes: 8 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

98 changes: 88 additions & 10 deletions src/registry/r2.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { createSHA256, IHasher, loadWasm } from "@taylorzane/hash-wasm";
import { Env } from "../..";
import jwt from "@tsndr/cloudflare-worker-jwt";
import {
Expand Down Expand Up @@ -30,6 +31,38 @@ import {
import { GarbageCollectionMode, GarbageCollector } from "./garbage-collector";
import { ManifestSchema, manifestSchema } from "../manifest";
import { DigestInvalid, RegistryResponseJSON } from "../v2-responses";
// @ts-expect-error: No declaration file for module
import sha256Wasm from "@taylorzane/hash-wasm/wasm/sha256.wasm";

export async function hash(readableStream: ReadableStream | null, state?: Uint8Array): Promise<IHasher> {
loadWasm({ sha256: sha256Wasm });
let hasher = await createSHA256();
if (state !== undefined) {
hasher.load(state);
} else {
hasher = hasher.init();
}

const reader = readableStream?.getReader({ mode: "byob" });
while (reader !== undefined) {
// Read limit 5MB so we don't buffer that much memory at a time (Workers runtime is kinda weird with constraints with tee() if the other stream side is very slow)
const array = new Uint8Array(1024 * 1024 * 5);
const value = await reader.read(array);
if (value.done) break;
hasher.update(value.value);
}

return hasher;
}

export function hashStateToUint8Array(hashState: string): Uint8Array {
const buffer = Buffer.from(hashState, "base64");
return new Uint8Array(buffer);
}

export function intoBase64FromUint8Array(array: Uint8Array): string {
return Buffer.from(array).toString("base64");
}

export type Chunk =
| {
Expand Down Expand Up @@ -66,6 +99,7 @@ export type State = {
registryUploadId: string;
byteRange: number;
name: string;
hashState?: string;
};

export function getRegistryUploadsPath(state: { registryUploadId: string; name: string }): string {
Expand Down Expand Up @@ -686,12 +720,48 @@ export class R2Registry implements Registry {
};
}

const res = await appendStreamKnownLength(stream, length);
let hasherPromise: Promise<IHasher> | undefined;
if (
length <= MAXIMUM_CHUNK &&
// if starting, or already started.
(state.parts.length === 0 || (state.parts.length > 0 && state.hashState !== undefined))
) {
const [s1, s2] = stream.tee();
stream = s1;
let bytes: undefined | Uint8Array;
if (state.hashState !== undefined) {
bytes = hashStateToUint8Array(state.hashState);
}

hasherPromise = hash(s2, bytes);
} else {
state.hashState = undefined;
}

const [res, hasherResponse] = await Promise.allSettled([appendStreamKnownLength(stream, length), hasherPromise]);
state.byteRange += length;
if (res instanceof RangeError)
if (res.status === "rejected") {
return {
response: res,
response: new InternalError(),
};
}

if (res.value instanceof RangeError) {
return {
response: res.value,
};
}

if (hasherPromise !== undefined && hasherResponse !== undefined) {
if (hasherResponse.status === "rejected") {
throw hasherResponse.reason;
}

if (hasherResponse.value === undefined) throw new Error("unreachable");

const value = hasherResponse.value.save();
state.hashState = intoBase64FromUint8Array(value);
}

const hashedJwtState = await encodeState(state, env);
return {
Expand Down Expand Up @@ -758,16 +828,24 @@ export class R2Registry implements Registry {
};
}

const target = `${namespace}/blobs/${expectedSha}`;
const MAXIMUM_SIZE_R2_OBJECT = 5 * 1000 * 1000 * 1000;
if (obj.size >= MAXIMUM_SIZE_R2_OBJECT && state.hashState === undefined) {
console.error(`The maximum size of an R2 object is 5gb, multipart uploads don't
have an sha256 option. Please try to use a push tool that chunks the layers if your layer is above 5gb`);
return {
response: new InternalError(),
};
}

const target = `${namespace}/blobs/${expectedSha}`;
// If layer surpasses the maximum size of an R2 upload, we need to calculate the digest
// stream and create a reference from the blobs path to the
// upload path. In R2, moving objects mean copying the stream, which
// doesn't really work if it's above 5GB due to R2 limits.
if (obj.size >= MAXIMUM_SIZE_R2_OBJECT) {
const compressionStream = new crypto.DigestStream("SHA-256");
obj.body.pipeTo(compressionStream);
const digest = hexToDigest(await compressionStream.digest);
// upload path. That's why we need hash-wasm, as it allows you to store the state.
if (state.hashState !== undefined) {
const stateEncoded = hashStateToUint8Array(state.hashState);
const hasher = await hash(null, stateEncoded);
const digest = hasher.digest("hex");

if (digest !== expectedSha) {
return { response: new RegistryResponseJSON(JSON.stringify(DigestInvalid(expectedSha, digest))) };
}
Expand Down

0 comments on commit 214f714

Please sign in to comment.