From b28155e5a403b2a45f62fe48cb2c1c8aa5f58657 Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Mon, 25 Nov 2024 16:44:46 +0000 Subject: [PATCH 1/3] registry: Fixes to allow layer uploads above 5GB As we need to copy a stream of the uploaded layer (random uuid) to a `blobs/.../` path, that might surpass the maximum size of a monolithic upload to R2 without using multipart uploads. To circumvent this, we will calculate the digest from the uploaded path and compare. Then, we will create a reference in the original `blobs/.../digest` path pointing to the upload path. --- src/registry/garbage-collector.ts | 7 ++ src/registry/r2.ts | 125 +++++++++++++++++++++++++++--- src/router.ts | 1 + src/v2-responses.ts | 35 +++++++++ test/index.test.ts | 5 +- 5 files changed, 161 insertions(+), 12 deletions(-) diff --git a/src/registry/garbage-collector.ts b/src/registry/garbage-collector.ts index 79fdafe..ecfc80e 100644 --- a/src/registry/garbage-collector.ts +++ b/src/registry/garbage-collector.ts @@ -4,6 +4,7 @@ import { ServerError } from "../errors"; import { ManifestSchema } from "../manifest"; +import { isReference } from "./r2"; export type GarbageCollectionMode = "unreferenced" | "untagged"; export type GCOptions = { @@ -219,6 +220,12 @@ export class GarbageCollector { await this.list(`${options.name}/blobs/`, async (object) => { const hash = object.key.split("/").pop(); if (hash && !referencedBlobs.has(hash)) { + const key = isReference(object); + // also push the underlying reference object + if (key) { + unreferencedKeys.push(key); + } + unreferencedKeys.push(object.key); if (unreferencedKeys.length > deleteThreshold) { if (!(await this.checkIfGCCanContinue(options.name, mark))) { diff --git a/src/registry/r2.ts b/src/registry/r2.ts index ef09b01..b9288e7 100644 --- a/src/registry/r2.ts +++ b/src/registry/r2.ts @@ -11,7 +11,7 @@ import { } from "../chunk"; import { InternalError, ManifestError, RangeError, ServerError } from "../errors"; import { SHA256_PREFIX_LEN, getSHA256, hexToDigest } from "../user"; -import { readableToBlob, readerToBlob, wrap } from "../utils"; +import { errorString, readableToBlob, readerToBlob, wrap } from "../utils"; import { BlobUnknownError, ManifestUnknownError } from "../v2-errors"; import { CheckLayerResponse, @@ -29,6 +29,7 @@ import { } from "./registry"; import { GarbageCollectionMode, GarbageCollector } from "./garbage-collector"; import { ManifestSchema, manifestSchema } from "../manifest"; +import { DigestInvalid, RegistryResponseJSON } from "../v2-responses"; export type Chunk = | { @@ -101,6 +102,10 @@ export async function encodeState(state: State, env: Env): Promise<{ jwt: string return { jwt: jwtSignature, hash: await getSHA256(jwtSignature, "") }; } +export const referenceHeader = "X-Serverless-Registry-Reference"; +export const digestHeaderInReference = "X-Serverless-Registry-Digest"; +export const registryUploadKey = "X-Serverless-Registry-Upload"; + export async function getUploadState( name: string, uploadId: string, @@ -127,6 +132,15 @@ export async function getUploadState( return { state: stateObject, stateStr: stateStr, hash: stateStrHash }; } +export function isReference(r2Object: R2Object): false | string { + if (r2Object.customMetadata === undefined) return false; + const value = r2Object.customMetadata[referenceHeader]; + if (value !== undefined) { + return value; + } + return false; +} + export class R2Registry implements Registry { private gc: GarbageCollector; @@ -196,6 +210,11 @@ export class R2Registry implements Registry { // name format is: // /<'blobs' | 'manifests'>/ const parts = object.key.split("/"); + // maybe an upload. + if (parts.length === 1) { + return; + } + const repository = parts.slice(0, parts.length - 2).join("/"); if (parts[parts.length - 2] === "blobs") return; @@ -389,15 +408,39 @@ export class R2Registry implements Registry { }; } + const key = isReference(res); + let [digest, size] = ["", 0]; + if (key) { + const [res, err] = await wrap(this.env.REGISTRY.head(key)); + if (err) { + return wrapError("layerExists", err); + } + + if (!res) { + return { exists: false }; + } + + if (!res.customMetadata) throw new Error("unreachable"); + if (!res.customMetadata[digestHeaderInReference]) throw new Error("unreachable"); + const possibleDigest = res.customMetadata[digestHeaderInReference]; + if (!possibleDigest) throw new Error("unreachable, no digest"); + + digest = possibleDigest; + size = res.size; + } else { + digest = hexToDigest(res.checksums.sha256!); + size = res.size; + } + return { - digest: hexToDigest(res.checksums.sha256!), - size: res.size, + digest, + size, exists: true, }; } async getLayer(name: string, digest: string): Promise { - const [res, err] = await wrap(this.env.REGISTRY.get(`${name}/blobs/${digest}`)); + let [res, err] = await wrap(this.env.REGISTRY.get(`${name}/blobs/${digest}`)); if (err) { return wrapError("getLayer", err); } @@ -408,9 +451,24 @@ export class R2Registry implements Registry { }; } + const id = isReference(res); + if (id) { + [res, err] = await wrap(this.env.REGISTRY.get(id)); + if (err) { + return wrapError("getLayer", err); + } + + if (!res) { + // not a 500, because garbage collection deletes the underlying layer first + return { + response: new Response(JSON.stringify(BlobUnknownError), { status: 404 }), + }; + } + } + return { stream: res.body!, - digest: hexToDigest(res.checksums.sha256!), + digest, size: res.size, }; } @@ -419,7 +477,9 @@ export class R2Registry implements Registry { // Generate a unique ID for this upload const uuid = crypto.randomUUID(); - const upload = await this.env.REGISTRY.createMultipartUpload(uuid); + const upload = await this.env.REGISTRY.createMultipartUpload(uuid, { + customMetadata: { [registryUploadKey]: "true" }, + }); const state = { uploadId: upload.uploadId, parts: [], @@ -691,12 +751,55 @@ export class R2Registry implements Registry { // TODO: Handle one last buffer here await upload.complete(state.parts); const obj = await this.env.REGISTRY.get(uuid); - const put = this.env.REGISTRY.put(`${namespace}/blobs/${expectedSha}`, obj!.body, { - sha256: (expectedSha as string).slice(SHA256_PREFIX_LEN), - }); + if (obj === null) { + console.error("unreachable, obj is null when we just created upload"); + return { + response: new InternalError(), + }; + } - await put; - await this.env.REGISTRY.delete(uuid); + const target = `${namespace}/blobs/${expectedSha}`; + const MAXIMUM_SIZE_R2_OBJECT = 5 * 1000 * 1000 * 1000; + // If layer surpasses the maximum size of an R2 upload, we need to calculate the digest + // stream and create a reference from the blobs path to the + // upload path. In R2, moving objects mean copying the stream, which + // doesn't really work if it's above 5GB due to R2 limits. + if (obj.size >= MAXIMUM_SIZE_R2_OBJECT) { + const compressionStream = new crypto.DigestStream("SHA-256"); + obj.body.pipeTo(compressionStream); + const digest = hexToDigest(await compressionStream.digest); + if (digest !== expectedSha) { + return { response: new RegistryResponseJSON(JSON.stringify(DigestInvalid(expectedSha, digest))) }; + } + + const [, err] = await wrap( + this.env.REGISTRY.put(target, uuid, { + customMetadata: { + [referenceHeader]: uuid, + [digestHeaderInReference]: digest, + }, + }), + ); + if (err !== null) { + console.error("error uploading reference blob", errorString(err)); + await this.env.REGISTRY.delete(uuid); + return { + response: new InternalError(), + }; + } + } else { + const put = this.env.REGISTRY.put(target, obj!.body, { + sha256: (expectedSha as string).slice(SHA256_PREFIX_LEN), + }); + const [, err] = await wrap(put); + await this.env.REGISTRY.delete(uuid); + if (err !== null) { + console.error("error uploading blob", errorString(err)); + return { + response: new InternalError(), + }; + } + } } await this.env.REGISTRY.delete(getRegistryUploadsPath(state)); diff --git a/src/router.ts b/src/router.ts index 19fac16..fb0753f 100644 --- a/src/router.ts +++ b/src/router.ts @@ -455,6 +455,7 @@ v2Router.put("/:name+/blobs/uploads/:uuid", async (req, env: Env) => { ); if (err) { + console.error("Error uploading manifest", errorString(err)) return new InternalError(); } diff --git a/src/v2-responses.ts b/src/v2-responses.ts index 502893d..6948361 100644 --- a/src/v2-responses.ts +++ b/src/v2-responses.ts @@ -7,3 +7,38 @@ export const ManifestTagsListTooBigError = { }, ], }; + +export class RegistryResponse extends Response { + constructor(body?: BodyInit | null, init?: ResponseInit) { + super(body, { + ...init, + headers: { + ...init?.headers, + "Docker-Distribution-Api-Version": "registry/2.0", + }, + }); + } +} +export class RegistryResponseJSON extends RegistryResponse { + constructor(body?: BodyInit | null, init?: ResponseInit) { + super(body, { + ...init, + headers: { + ...init?.headers, + "Content-Type": "application/json", + }, + }); + } +} +export const DigestInvalid = (expected: string, got: string) => ({ + errors: [ + { + code: "DIGEST_INVALID", + message: "digests don't match", + detail: { + Expected: expected, + Got: got, + }, + }, + ], +}); diff --git a/test/index.test.ts b/test/index.test.ts index 09d7472..c672364 100644 --- a/test/index.test.ts +++ b/test/index.test.ts @@ -22,7 +22,10 @@ async function generateManifest(name: string, schemaVersion: 1 | 2 = 2): Promise const res2 = await fetch(createRequest("PATCH", res.headers.get("location")!, stream, {})); expect(res2.ok).toBeTruthy(); const last = await fetch(createRequest("PUT", res2.headers.get("location")! + "&digest=" + sha256, null, {})); - expect(last.ok).toBeTruthy(); + if (!last.ok) { + throw new Error(await last.text()); + } + return schemaVersion === 1 ? { schemaVersion, From 9511163514ecb8fe48f10e35ae9aa16d54c7d5d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Villalonga=20Sim=C3=B3n?= Date: Tue, 26 Nov 2024 17:11:11 -0600 Subject: [PATCH 2/3] push: Implement retry logic (test) --- push/index.ts | 46 ++++++++++++++++++++++++++++++---------------- push/limiter.ts | 27 +++++++++++++++++++++++++-- 2 files changed, 55 insertions(+), 18 deletions(-) diff --git a/push/index.ts b/push/index.ts index 9b4cc66..6d73228 100644 --- a/push/index.ts +++ b/push/index.ts @@ -1,4 +1,4 @@ -import { $, CryptoHasher, file, write } from "bun"; +import { $, CryptoHasher, file, sleep, write } from "bun"; import { extract } from "tar"; import stream from "node:stream"; @@ -71,7 +71,7 @@ if (!(await file(tarFile).exists())) { await mkdir(imagePath); - const result = await extract({ + await extract({ file: tarFile, cwd: imagePath, }); @@ -251,7 +251,6 @@ async function pushLayer(layerDigest: string, readableStream: ReadableStream, to throw new Error(`oci-chunk-max-length header is malformed (not a number)`); } - const reader = readableStream.getReader(); const uploadId = createUploadResponse.headers.get("docker-upload-uuid"); if (uploadId === null) { throw new Error("Docker-Upload-UUID not defined in headers"); @@ -271,9 +270,13 @@ async function pushLayer(layerDigest: string, readableStream: ReadableStream, to let written = 0; let previousReadable: ReadableLimiter | undefined; let totalLayerSizeLeft = totalLayerSize; + const reader = readableStream.getReader(); + let fail = "true"; + let failures = 0; while (totalLayerSizeLeft > 0) { const range = `0-${Math.min(end, totalLayerSize) - 1}`; const current = new ReadableLimiter(reader as ReadableStreamDefaultReader, maxToWrite, previousReadable); + await current.init(); const patchChunkUploadURL = parseLocation(location); // we have to do fetchNode because Bun doesn't allow setting custom Content-Length. // https://github.com/oven-sh/bun/issues/10507 @@ -284,14 +287,19 @@ async function pushLayer(layerDigest: string, readableStream: ReadableStream, to "range": range, "authorization": cred, "content-length": `${Math.min(totalLayerSizeLeft, maxToWrite)}`, + "x-fail": fail, }), }); if (!patchChunkResult.ok) { - throw new Error( - `uploading chunk ${patchChunkUploadURL} returned ${patchChunkResult.status}: ${await patchChunkResult.text()}`, - ); + previousReadable = current; + console.error(`${layerDigest}: Pushing ${range} failed with ${patchChunkResult.status}, retrying`); + await sleep(500); + if (failures++ >= 2) fail = "false"; + continue; } + fail = "true"; + current.ok(); const rangeResponse = patchChunkResult.headers.get("range"); if (rangeResponse !== range) { throw new Error(`unexpected Range header ${rangeResponse}, expected ${range}`); @@ -308,18 +316,24 @@ async function pushLayer(layerDigest: string, readableStream: ReadableStream, to const range = `0-${written - 1}`; const uploadURL = new URL(parseLocation(location)); uploadURL.searchParams.append("digest", layerDigest); - const response = await fetch(uploadURL.toString(), { - method: "PUT", - headers: new Headers({ - Range: range, - Authorization: cred, - }), - }); - if (!response.ok) { - throw new Error(`${uploadURL.toString()} failed with ${response.status}: ${await response.text()}`); + for (let tries = 0; tries < 3; tries++) { + const response = await fetch(uploadURL.toString(), { + method: "PUT", + headers: new Headers({ + Range: range, + Authorization: cred, + }), + }); + if (!response.ok) { + console.error(`${layerDigest}: Finishing ${range} failed with ${response.status}, retrying`); + continue; + } + + console.log("Pushed", layerDigest); + return; } - console.log("Pushed", layerDigest); + throw new Error(`Could not push after multiple tries`); } const layersManifest = [] as { diff --git a/push/limiter.ts b/push/limiter.ts index 422e068..8d08f88 100644 --- a/push/limiter.ts +++ b/push/limiter.ts @@ -6,6 +6,8 @@ import stream from "node:stream"; export class ReadableLimiter extends stream.Readable { public written: number = 0; private leftover: Uint8Array | undefined; + private promise: Promise | undefined; + private accumulator: Uint8Array[]; constructor( // reader will be used to read bytes until limit. @@ -17,7 +19,27 @@ export class ReadableLimiter extends stream.Readable { ) { super(); - if (previousReader) this.leftover = previousReader.leftover; + if (previousReader) { + this.leftover = previousReader.leftover; + if (previousReader.accumulator.length > 0) { + this.promise = new Blob(previousReader.accumulator).bytes(); + previousReader.accumulator = []; + } + } + + this.accumulator = []; + } + + async init() { + if (this.promise !== undefined) { + if (this.leftover !== undefined && this.leftover.length > 0) + this.leftover = await new Blob([await this.promise, this.leftover ?? []]).bytes(); + else this.leftover = await this.promise; + } + } + + ok() { + this.accumulator = []; } _read(): void { @@ -27,6 +49,7 @@ export class ReadableLimiter extends stream.Readable { if (this.leftover !== undefined) { const toPushNow = this.leftover.slice(0, this.limit); + this.accumulator.push(toPushNow); this.leftover = this.leftover.slice(this.limit); this.push(toPushNow); this.limit -= toPushNow.length; @@ -50,7 +73,7 @@ export class ReadableLimiter extends stream.Readable { } if (arr.length === 0) return this.push(null); - + this.accumulator.push(arr); this.push(arr); this.limit -= arr.length; this.written += arr.length; From b7403912288a4361582b6600b99448ba133e912f Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Wed, 27 Nov 2024 00:08:21 +0000 Subject: [PATCH 3/3] registry: Introduce hash-wasm so we can store the hash state and we can continue hashing from where we left it --- package.json | 1 + pnpm-lock.yaml | 8 ++++ src/registry/r2.ts | 100 ++++++++++++++++++++++++++++++++++++++++----- src/router.ts | 11 ++++- 4 files changed, 108 insertions(+), 12 deletions(-) diff --git a/package.json b/package.json index 2d1cf78..781a682 100644 --- a/package.json +++ b/package.json @@ -11,6 +11,7 @@ }, "dependencies": { "@cfworker/base64url": "^1.12.5", + "@taylorzane/hash-wasm": "^0.0.11", "@tsndr/cloudflare-worker-jwt": "^2.5.1", "itty-router": "^4.0.27", "zod": "^3.22.4" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 960acca..60f6597 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -14,6 +14,9 @@ importers: '@cfworker/base64url': specifier: ^1.12.5 version: 1.12.5 + '@taylorzane/hash-wasm': + specifier: ^0.0.11 + version: 0.0.11 '@tsndr/cloudflare-worker-jwt': specifier: ^2.5.1 version: 2.5.3 @@ -556,6 +559,9 @@ packages: cpu: [x64] os: [win32] + '@taylorzane/hash-wasm@0.0.11': + resolution: {integrity: sha512-gGS/x7hCq6MlwGoLiscyTG/pSaVhshImaHd+ttV1uK0aYPCQNsbPCROAzBDCh+FxpiLgzX6JaX8BIYFlljFEpQ==} + '@tsndr/cloudflare-worker-jwt@2.5.3': resolution: {integrity: sha512-zbdvjRG86y/ObiBgTJrzBC39t2FcaeGwB6AV7VO4LvHKJNyZvLYRbKT68eaoJhnJldyHhs7yZ69neRVdUd9knA==} @@ -1689,6 +1695,8 @@ snapshots: '@rollup/rollup-win32-x64-msvc@4.22.4': optional: true + '@taylorzane/hash-wasm@0.0.11': {} + '@tsndr/cloudflare-worker-jwt@2.5.3': {} '@types/estree@1.0.5': {} diff --git a/src/registry/r2.ts b/src/registry/r2.ts index b9288e7..6122bde 100644 --- a/src/registry/r2.ts +++ b/src/registry/r2.ts @@ -1,3 +1,4 @@ +import { createSHA256, IHasher, loadWasm } from "@taylorzane/hash-wasm"; import { Env } from "../.."; import jwt from "@tsndr/cloudflare-worker-jwt"; import { @@ -10,6 +11,8 @@ import { split, } from "../chunk"; import { InternalError, ManifestError, RangeError, ServerError } from "../errors"; +import { Buffer } from "node:buffer"; + import { SHA256_PREFIX_LEN, getSHA256, hexToDigest } from "../user"; import { errorString, readableToBlob, readerToBlob, wrap } from "../utils"; import { BlobUnknownError, ManifestUnknownError } from "../v2-errors"; @@ -30,6 +33,38 @@ import { import { GarbageCollectionMode, GarbageCollector } from "./garbage-collector"; import { ManifestSchema, manifestSchema } from "../manifest"; import { DigestInvalid, RegistryResponseJSON } from "../v2-responses"; +// @ts-expect-error: No declaration file for module +import sha256Wasm from "@taylorzane/hash-wasm/wasm/sha256.wasm"; + +export async function hash(readableStream: ReadableStream | null, state?: Uint8Array): Promise { + loadWasm({ sha256: sha256Wasm }); + let hasher = await createSHA256(); + if (state !== undefined) { + hasher.load(state); + } else { + hasher = hasher.init(); + } + + const reader = readableStream?.getReader({ mode: "byob" }); + while (reader !== undefined) { + // Read limit 5MB so we don't buffer that much memory at a time (Workers runtime is kinda weird with constraints with tee() if the other stream side is very slow) + const array = new Uint8Array(1024 * 1024 * 5); + const value = await reader.read(array); + if (value.done) break; + hasher.update(value.value); + } + + return hasher; +} + +export function hashStateToUint8Array(hashState: string): Uint8Array { + const buffer = Buffer.from(hashState, "base64"); + return new Uint8Array(buffer); +} + +export function intoBase64FromUint8Array(array: Uint8Array): string { + return Buffer.from(array).toString("base64"); +} export type Chunk = | { @@ -66,6 +101,7 @@ export type State = { registryUploadId: string; byteRange: number; name: string; + hashState?: string; }; export function getRegistryUploadsPath(state: { registryUploadId: string; name: string }): string { @@ -686,12 +722,48 @@ export class R2Registry implements Registry { }; } - const res = await appendStreamKnownLength(stream, length); + let hasherPromise: Promise | undefined; + if ( + length <= MAXIMUM_CHUNK && + // if starting, or already started. + (state.parts.length === 0 || (state.parts.length > 0 && state.hashState !== undefined)) + ) { + const [s1, s2] = stream.tee(); + stream = s1; + let bytes: undefined | Uint8Array; + if (state.hashState !== undefined) { + bytes = hashStateToUint8Array(state.hashState); + } + + hasherPromise = hash(s2, bytes); + } else { + state.hashState = undefined; + } + + const [res, hasherResponse] = await Promise.allSettled([appendStreamKnownLength(stream, length), hasherPromise]); state.byteRange += length; - if (res instanceof RangeError) + if (res.status === "rejected") { + return { + response: new InternalError(), + }; + } + + if (res.value instanceof RangeError) { return { - response: res, + response: res.value, }; + } + + if (hasherPromise !== undefined && hasherResponse !== undefined) { + if (hasherResponse.status === "rejected") { + throw hasherResponse.reason; + } + + if (hasherResponse.value === undefined) throw new Error("unreachable"); + + const value = hasherResponse.value.save(); + state.hashState = intoBase64FromUint8Array(value); + } const hashedJwtState = await encodeState(state, env); return { @@ -758,16 +830,24 @@ export class R2Registry implements Registry { }; } - const target = `${namespace}/blobs/${expectedSha}`; const MAXIMUM_SIZE_R2_OBJECT = 5 * 1000 * 1000 * 1000; + if (obj.size >= MAXIMUM_SIZE_R2_OBJECT && state.hashState === undefined) { + console.error(`The maximum size of an R2 object is 5gb, multipart uploads don't + have an sha256 option. Please try to use a push tool that chunks the layers if your layer is above 5gb`); + return { + response: new InternalError(), + }; + } + + const target = `${namespace}/blobs/${expectedSha}`; // If layer surpasses the maximum size of an R2 upload, we need to calculate the digest // stream and create a reference from the blobs path to the - // upload path. In R2, moving objects mean copying the stream, which - // doesn't really work if it's above 5GB due to R2 limits. - if (obj.size >= MAXIMUM_SIZE_R2_OBJECT) { - const compressionStream = new crypto.DigestStream("SHA-256"); - obj.body.pipeTo(compressionStream); - const digest = hexToDigest(await compressionStream.digest); + // upload path. That's why we need hash-wasm, as it allows you to store the state. + if (state.hashState !== undefined) { + const stateEncoded = hashStateToUint8Array(state.hashState); + const hasher = await hash(null, stateEncoded); + const digest = "sha256:" + hasher.digest("hex"); + if (digest !== expectedSha) { return { response: new RegistryResponseJSON(JSON.stringify(DigestInvalid(expectedSha, digest))) }; } diff --git a/src/router.ts b/src/router.ts index fb0753f..52a38bf 100644 --- a/src/router.ts +++ b/src/router.ts @@ -329,7 +329,7 @@ v2Router.delete("/:name+/blobs/uploads/:id", async (req, env: Env) => { // this is the first thing that the client asks for in an upload v2Router.post("/:name+/blobs/uploads/", async (req, env: Env) => { - const { name } = req.params; + const { name } = req.params; const [uploadObject, err] = await wrap(env.REGISTRY_CLIENT.startUpload(name)); if (err) { @@ -398,6 +398,13 @@ v2Router.patch("/:name+/blobs/uploads/:uuid", async (req, env: Env) => { return new Response(null, { status: 400 }); } + // if (req.headers.get("x-fail") === "true") { + // const digest = new crypto.DigestStream("SHA-256"); + // req.body.pipeTo(digest); + // await digest.digest; + // return new Response(null, { status: 500 }); + // } + let contentLengthString = req.headers.get("Content-Length"); let stream = req.body; if (!contentLengthString) { @@ -455,7 +462,7 @@ v2Router.put("/:name+/blobs/uploads/:uuid", async (req, env: Env) => { ); if (err) { - console.error("Error uploading manifest", errorString(err)) + console.error("Error uploading manifest", errorString(err)); return new InternalError(); }