Skip to content

Commit

Permalink
registry: Introduce hash-wasm so we can store the hash state and we c…
Browse files Browse the repository at this point in the history
…an continue hashing from where we left it
  • Loading branch information
gabivlj committed Nov 27, 2024
1 parent 9511163 commit a269a43
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 12 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
},
"dependencies": {
"@cfworker/base64url": "^1.12.5",
"@taylorzane/hash-wasm": "^0.0.11",
"@tsndr/cloudflare-worker-jwt": "^2.5.1",
"itty-router": "^4.0.27",
"zod": "^3.22.4"
Expand Down
8 changes: 8 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

100 changes: 90 additions & 10 deletions src/registry/r2.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { createSHA256, IHasher, loadWasm } from "@taylorzane/hash-wasm";
import { Env } from "../..";
import jwt from "@tsndr/cloudflare-worker-jwt";
import {
Expand All @@ -10,6 +11,8 @@ import {
split,
} from "../chunk";
import { InternalError, ManifestError, RangeError, ServerError } from "../errors";
import { Buffer } from "node:buffer";

import { SHA256_PREFIX_LEN, getSHA256, hexToDigest } from "../user";
import { errorString, readableToBlob, readerToBlob, wrap } from "../utils";
import { BlobUnknownError, ManifestUnknownError } from "../v2-errors";
Expand All @@ -30,6 +33,38 @@ import {
import { GarbageCollectionMode, GarbageCollector } from "./garbage-collector";
import { ManifestSchema, manifestSchema } from "../manifest";
import { DigestInvalid, RegistryResponseJSON } from "../v2-responses";
// @ts-expect-error: No declaration file for module
import sha256Wasm from "@taylorzane/hash-wasm/wasm/sha256.wasm";

export async function hash(readableStream: ReadableStream | null, state?: Uint8Array): Promise<IHasher> {
loadWasm({ sha256: sha256Wasm });
let hasher = await createSHA256();
if (state !== undefined) {
hasher.load(state);
} else {
hasher = hasher.init();
}

const reader = readableStream?.getReader({ mode: "byob" });
while (reader !== undefined) {
// Read limit 5MB so we don't buffer that much memory at a time (Workers runtime is kinda weird with constraints with tee() if the other stream side is very slow)
const array = new Uint8Array(1024 * 1024 * 5);
const value = await reader.read(array);
if (value.done) break;
hasher.update(value.value);
}

return hasher;
}

export function hashStateToUint8Array(hashState: string): Uint8Array {
const buffer = Buffer.from(hashState, "base64");
return new Uint8Array(buffer);
}

export function intoBase64FromUint8Array(array: Uint8Array): string {
return Buffer.from(array).toString("base64");
}

export type Chunk =
| {
Expand Down Expand Up @@ -66,6 +101,7 @@ export type State = {
registryUploadId: string;
byteRange: number;
name: string;
hashState?: string;
};

export function getRegistryUploadsPath(state: { registryUploadId: string; name: string }): string {
Expand Down Expand Up @@ -686,12 +722,48 @@ export class R2Registry implements Registry {
};
}

const res = await appendStreamKnownLength(stream, length);
let hasherPromise: Promise<IHasher> | undefined;
if (
length <= MAXIMUM_CHUNK &&
// if starting, or already started.
(state.parts.length === 0 || (state.parts.length > 0 && state.hashState !== undefined))
) {
const [s1, s2] = stream.tee();
stream = s1;
let bytes: undefined | Uint8Array;
if (state.hashState !== undefined) {
bytes = hashStateToUint8Array(state.hashState);
}

hasherPromise = hash(s2, bytes);
} else {
state.hashState = undefined;
}

const [res, hasherResponse] = await Promise.allSettled([appendStreamKnownLength(stream, length), hasherPromise]);
state.byteRange += length;
if (res instanceof RangeError)
if (res.status === "rejected") {
return {
response: new InternalError(),
};
}

if (res.value instanceof RangeError) {
return {
response: res,
response: res.value,
};
}

if (hasherPromise !== undefined && hasherResponse !== undefined) {
if (hasherResponse.status === "rejected") {
throw hasherResponse.reason;
}

if (hasherResponse.value === undefined) throw new Error("unreachable");

const value = hasherResponse.value.save();
state.hashState = intoBase64FromUint8Array(value);
}

const hashedJwtState = await encodeState(state, env);
return {
Expand Down Expand Up @@ -758,16 +830,24 @@ export class R2Registry implements Registry {
};
}

const target = `${namespace}/blobs/${expectedSha}`;
const MAXIMUM_SIZE_R2_OBJECT = 5 * 1000 * 1000 * 1000;
if (obj.size >= MAXIMUM_SIZE_R2_OBJECT && state.hashState === undefined) {
console.error(`The maximum size of an R2 object is 5gb, multipart uploads don't
have an sha256 option. Please try to use a push tool that chunks the layers if your layer is above 5gb`);
return {
response: new InternalError(),
};
}

const target = `${namespace}/blobs/${expectedSha}`;
// If layer surpasses the maximum size of an R2 upload, we need to calculate the digest
// stream and create a reference from the blobs path to the
// upload path. In R2, moving objects mean copying the stream, which
// doesn't really work if it's above 5GB due to R2 limits.
if (obj.size >= MAXIMUM_SIZE_R2_OBJECT) {
const compressionStream = new crypto.DigestStream("SHA-256");
obj.body.pipeTo(compressionStream);
const digest = hexToDigest(await compressionStream.digest);
// upload path. That's why we need hash-wasm, as it allows you to store the state.
if (state.hashState !== undefined) {
const stateEncoded = hashStateToUint8Array(state.hashState);
const hasher = await hash(null, stateEncoded);
const digest = hasher.digest("hex");

if (digest !== expectedSha) {
return { response: new RegistryResponseJSON(JSON.stringify(DigestInvalid(expectedSha, digest))) };
}
Expand Down
11 changes: 9 additions & 2 deletions src/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ v2Router.delete("/:name+/blobs/uploads/:id", async (req, env: Env) => {

// this is the first thing that the client asks for in an upload
v2Router.post("/:name+/blobs/uploads/", async (req, env: Env) => {
const { name } = req.params;
const { name } = req.params;
const [uploadObject, err] = await wrap<UploadObject | RegistryError, Error>(env.REGISTRY_CLIENT.startUpload(name));

if (err) {
Expand Down Expand Up @@ -398,6 +398,13 @@ v2Router.patch("/:name+/blobs/uploads/:uuid", async (req, env: Env) => {
return new Response(null, { status: 400 });
}

// if (req.headers.get("x-fail") === "true") {
// const digest = new crypto.DigestStream("SHA-256");
// req.body.pipeTo(digest);
// await digest.digest;
// return new Response(null, { status: 500 });
// }

let contentLengthString = req.headers.get("Content-Length");
let stream = req.body;
if (!contentLengthString) {
Expand Down Expand Up @@ -455,7 +462,7 @@ v2Router.put("/:name+/blobs/uploads/:uuid", async (req, env: Env) => {
);

if (err) {
console.error("Error uploading manifest", errorString(err))
console.error("Error uploading manifest", errorString(err));
return new InternalError();
}

Expand Down

0 comments on commit a269a43

Please sign in to comment.