From 214f714b226670942604f0af7f7202cad8a5879c Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Wed, 27 Nov 2024 00:08:21 +0000 Subject: [PATCH] registry: Introduce hash-wasm so we can store the hash state and we can continue hashing from where we left it --- package.json | 1 + pnpm-lock.yaml | 8 ++++ src/registry/r2.ts | 98 +++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 97 insertions(+), 10 deletions(-) diff --git a/package.json b/package.json index 2d1cf78..781a682 100644 --- a/package.json +++ b/package.json @@ -11,6 +11,7 @@ }, "dependencies": { "@cfworker/base64url": "^1.12.5", + "@taylorzane/hash-wasm": "^0.0.11", "@tsndr/cloudflare-worker-jwt": "^2.5.1", "itty-router": "^4.0.27", "zod": "^3.22.4" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 960acca..60f6597 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -14,6 +14,9 @@ importers: '@cfworker/base64url': specifier: ^1.12.5 version: 1.12.5 + '@taylorzane/hash-wasm': + specifier: ^0.0.11 + version: 0.0.11 '@tsndr/cloudflare-worker-jwt': specifier: ^2.5.1 version: 2.5.3 @@ -556,6 +559,9 @@ packages: cpu: [x64] os: [win32] + '@taylorzane/hash-wasm@0.0.11': + resolution: {integrity: sha512-gGS/x7hCq6MlwGoLiscyTG/pSaVhshImaHd+ttV1uK0aYPCQNsbPCROAzBDCh+FxpiLgzX6JaX8BIYFlljFEpQ==} + '@tsndr/cloudflare-worker-jwt@2.5.3': resolution: {integrity: sha512-zbdvjRG86y/ObiBgTJrzBC39t2FcaeGwB6AV7VO4LvHKJNyZvLYRbKT68eaoJhnJldyHhs7yZ69neRVdUd9knA==} @@ -1689,6 +1695,8 @@ snapshots: '@rollup/rollup-win32-x64-msvc@4.22.4': optional: true + '@taylorzane/hash-wasm@0.0.11': {} + '@tsndr/cloudflare-worker-jwt@2.5.3': {} '@types/estree@1.0.5': {} diff --git a/src/registry/r2.ts b/src/registry/r2.ts index b9288e7..0c6afc8 100644 --- a/src/registry/r2.ts +++ b/src/registry/r2.ts @@ -1,3 +1,4 @@ +import { createSHA256, IHasher, loadWasm } from "@taylorzane/hash-wasm"; import { Env } from "../.."; import jwt from "@tsndr/cloudflare-worker-jwt"; import { @@ -30,6 +31,38 @@ import { import { GarbageCollectionMode, GarbageCollector } from "./garbage-collector"; import { ManifestSchema, manifestSchema } from "../manifest"; import { DigestInvalid, RegistryResponseJSON } from "../v2-responses"; +// @ts-expect-error: No declaration file for module +import sha256Wasm from "@taylorzane/hash-wasm/wasm/sha256.wasm"; + +export async function hash(readableStream: ReadableStream | null, state?: Uint8Array): Promise { + loadWasm({ sha256: sha256Wasm }); + let hasher = await createSHA256(); + if (state !== undefined) { + hasher.load(state); + } else { + hasher = hasher.init(); + } + + const reader = readableStream?.getReader({ mode: "byob" }); + while (reader !== undefined) { + // Read limit 5MB so we don't buffer that much memory at a time (Workers runtime is kinda weird with constraints with tee() if the other stream side is very slow) + const array = new Uint8Array(1024 * 1024 * 5); + const value = await reader.read(array); + if (value.done) break; + hasher.update(value.value); + } + + return hasher; +} + +export function hashStateToUint8Array(hashState: string): Uint8Array { + const buffer = Buffer.from(hashState, "base64"); + return new Uint8Array(buffer); +} + +export function intoBase64FromUint8Array(array: Uint8Array): string { + return Buffer.from(array).toString("base64"); +} export type Chunk = | { @@ -66,6 +99,7 @@ export type State = { registryUploadId: string; byteRange: number; name: string; + hashState?: string; }; export function getRegistryUploadsPath(state: { registryUploadId: string; name: string }): string { @@ -686,12 +720,48 @@ export class R2Registry implements Registry { }; } - const res = await appendStreamKnownLength(stream, length); + let hasherPromise: Promise | undefined; + if ( + length <= MAXIMUM_CHUNK && + // if starting, or already started. + (state.parts.length === 0 || (state.parts.length > 0 && state.hashState !== undefined)) + ) { + const [s1, s2] = stream.tee(); + stream = s1; + let bytes: undefined | Uint8Array; + if (state.hashState !== undefined) { + bytes = hashStateToUint8Array(state.hashState); + } + + hasherPromise = hash(s2, bytes); + } else { + state.hashState = undefined; + } + + const [res, hasherResponse] = await Promise.allSettled([appendStreamKnownLength(stream, length), hasherPromise]); state.byteRange += length; - if (res instanceof RangeError) + if (res.status === "rejected") { return { - response: res, + response: new InternalError(), }; + } + + if (res.value instanceof RangeError) { + return { + response: res.value, + }; + } + + if (hasherPromise !== undefined && hasherResponse !== undefined) { + if (hasherResponse.status === "rejected") { + throw hasherResponse.reason; + } + + if (hasherResponse.value === undefined) throw new Error("unreachable"); + + const value = hasherResponse.value.save(); + state.hashState = intoBase64FromUint8Array(value); + } const hashedJwtState = await encodeState(state, env); return { @@ -758,16 +828,24 @@ export class R2Registry implements Registry { }; } - const target = `${namespace}/blobs/${expectedSha}`; const MAXIMUM_SIZE_R2_OBJECT = 5 * 1000 * 1000 * 1000; + if (obj.size >= MAXIMUM_SIZE_R2_OBJECT && state.hashState === undefined) { + console.error(`The maximum size of an R2 object is 5gb, multipart uploads don't + have an sha256 option. Please try to use a push tool that chunks the layers if your layer is above 5gb`); + return { + response: new InternalError(), + }; + } + + const target = `${namespace}/blobs/${expectedSha}`; // If layer surpasses the maximum size of an R2 upload, we need to calculate the digest // stream and create a reference from the blobs path to the - // upload path. In R2, moving objects mean copying the stream, which - // doesn't really work if it's above 5GB due to R2 limits. - if (obj.size >= MAXIMUM_SIZE_R2_OBJECT) { - const compressionStream = new crypto.DigestStream("SHA-256"); - obj.body.pipeTo(compressionStream); - const digest = hexToDigest(await compressionStream.digest); + // upload path. That's why we need hash-wasm, as it allows you to store the state. + if (state.hashState !== undefined) { + const stateEncoded = hashStateToUint8Array(state.hashState); + const hasher = await hash(null, stateEncoded); + const digest = hasher.digest("hex"); + if (digest !== expectedSha) { return { response: new RegistryResponseJSON(JSON.stringify(DigestInvalid(expectedSha, digest))) }; }