Skip to content

Commit

Permalink
registry: Fixes to allow layer uploads above 5GB
Browse files Browse the repository at this point in the history
As we need to copy a stream of the uploaded layer (random uuid) to
a `blobs/.../<digest>` path, that might surpass the maximum size of a
monolithic upload to R2 without using multipart uploads.

To circumvent this, we will calculate the digest from the uploaded path
and compare. Then, we will create a reference in the original
`blobs/.../digest` path pointing to the upload path.
  • Loading branch information
gabivlj committed Nov 25, 2024
1 parent 7f27e14 commit f7de24d
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 12 deletions.
8 changes: 8 additions & 0 deletions src/registry/garbage-collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// Unreferenced will delete all blobs that are not referenced by any manifest.
// Untagged will delete all blobs that are not referenced by any manifest and are not tagged.

import { unknown } from "zod";

Check failure on line 5 in src/registry/garbage-collector.ts

View workflow job for this annotation

GitHub Actions / Unit Tests (18.x)

'unknown' is declared but its value is never read.

Check failure on line 5 in src/registry/garbage-collector.ts

View workflow job for this annotation

GitHub Actions / Unit Tests (18.x)

'unknown' is declared but its value is never read.

Check failure on line 5 in src/registry/garbage-collector.ts

View workflow job for this annotation

GitHub Actions / Unit Tests (20.x)

'unknown' is declared but its value is never read.
import { ServerError } from "../errors";
import { ManifestSchema } from "../manifest";
import { isReference } from "./r2";

export type GarbageCollectionMode = "unreferenced" | "untagged";
export type GCOptions = {
Expand Down Expand Up @@ -219,6 +221,12 @@ export class GarbageCollector {
await this.list(`${options.name}/blobs/`, async (object) => {
const hash = object.key.split("/").pop();
if (hash && !referencedBlobs.has(hash)) {
const key = isReference(object);
// also push the underlying reference object
if (key) {
unreferencedKeys.push(key);
}

unreferencedKeys.push(object.key);
if (unreferencedKeys.length > deleteThreshold) {
if (!(await this.checkIfGCCanContinue(options.name, mark))) {
Expand Down
126 changes: 115 additions & 11 deletions src/registry/r2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
} from "../chunk";
import { InternalError, ManifestError, RangeError, ServerError } from "../errors";
import { SHA256_PREFIX_LEN, getSHA256, hexToDigest } from "../user";
import { readableToBlob, readerToBlob, wrap } from "../utils";
import { errorString, readableToBlob, readerToBlob, wrap } from "../utils";
import { BlobUnknownError, ManifestUnknownError } from "../v2-errors";
import {
CheckLayerResponse,
Expand All @@ -29,6 +29,7 @@ import {
} from "./registry";
import { GarbageCollectionMode, GarbageCollector } from "./garbage-collector";
import { ManifestSchema, manifestSchema } from "../manifest";
import { DigestInvalid, RegistryResponseJSON } from "../v2-responses";

export type Chunk =
| {
Expand Down Expand Up @@ -101,6 +102,10 @@ export async function encodeState(state: State, env: Env): Promise<{ jwt: string
return { jwt: jwtSignature, hash: await getSHA256(jwtSignature, "") };
}

export const referenceHeader = "X-Serverless-Registry-Reference";
export const digestHeaderInReference = "X-Serverless-Registry-Digest";
export const registryUploadKey = "X-Serverless-Registry-Upload";

export async function getUploadState(
name: string,
uploadId: string,
Expand All @@ -127,6 +132,15 @@ export async function getUploadState(
return { state: stateObject, stateStr: stateStr, hash: stateStrHash };
}

export function isReference(r2Object: R2Object): false | string {
if (r2Object.customMetadata === undefined) return false;
const value = r2Object.customMetadata[referenceHeader];
if (value !== undefined) {
return value;
}
return false;
}

export class R2Registry implements Registry {
private gc: GarbageCollector;

Expand Down Expand Up @@ -196,12 +210,18 @@ export class R2Registry implements Registry {
// name format is:
// <path>/<'blobs' | 'manifests'>/<name>
const parts = object.key.split("/");
// maybe an upload.
if (parts.length === 1) {
return;
}

const repository = parts.slice(0, parts.length - 2).join("/");
if (parts[parts.length - 2] === "blobs") return;

if (repository in repositories) return;
totalRecords++;
repositories[repository] = {};
console.log("Pushing", object.key, object.customMetadata);
repositoriesOrder.push(repository);
};

Expand Down Expand Up @@ -389,15 +409,39 @@ export class R2Registry implements Registry {
};
}

const key = isReference(res);
let [digest, size] = ["", 0];
if (key) {
const [res, err] = await wrap(this.env.REGISTRY.head(key));
if (err) {
return wrapError("layerExists", err);
}

if (!res) {
return { exists: false };
}

if (!res.customMetadata) throw new Error("unreachable");
if (!res.customMetadata[digestHeaderInReference]) throw new Error("unreachable");
const possibleDigest = res.customMetadata[digestHeaderInReference];
if (!possibleDigest) throw new Error("unreachable, no digest");

digest = possibleDigest;
size = res.size;
} else {
digest = hexToDigest(res.checksums.sha256!);
size = res.size;
}

return {
digest: hexToDigest(res.checksums.sha256!),
size: res.size,
digest,
size,
exists: true,
};
}

async getLayer(name: string, digest: string): Promise<RegistryError | GetLayerResponse> {
const [res, err] = await wrap(this.env.REGISTRY.get(`${name}/blobs/${digest}`));
let [res, err] = await wrap(this.env.REGISTRY.get(`${name}/blobs/${digest}`));
if (err) {
return wrapError("getLayer", err);
}
Expand All @@ -408,9 +452,24 @@ export class R2Registry implements Registry {
};
}

const id = isReference(res);
if (id) {
[res, err] = await wrap(this.env.REGISTRY.get(id));
if (err) {
return wrapError("getLayer", err);
}

if (!res) {
// not a 500, because garbage collection deletes the underlying layer first
return {
response: new Response(JSON.stringify(BlobUnknownError), { status: 404 }),
};
}
}

return {
stream: res.body!,
digest: hexToDigest(res.checksums.sha256!),
digest,
size: res.size,
};
}
Expand All @@ -419,7 +478,9 @@ export class R2Registry implements Registry {
// Generate a unique ID for this upload
const uuid = crypto.randomUUID();

const upload = await this.env.REGISTRY.createMultipartUpload(uuid);
const upload = await this.env.REGISTRY.createMultipartUpload(uuid, {
customMetadata: { [registryUploadKey]: "true" },
});
const state = {
uploadId: upload.uploadId,
parts: [],
Expand Down Expand Up @@ -691,12 +752,55 @@ export class R2Registry implements Registry {
// TODO: Handle one last buffer here
await upload.complete(state.parts);
const obj = await this.env.REGISTRY.get(uuid);
const put = this.env.REGISTRY.put(`${namespace}/blobs/${expectedSha}`, obj!.body, {
sha256: (expectedSha as string).slice(SHA256_PREFIX_LEN),
});
if (obj === null) {
console.error("unreachable, obj is null when we just created upload");
return {
response: new InternalError(),
};
}

await put;
await this.env.REGISTRY.delete(uuid);
const target = `${namespace}/blobs/${expectedSha}`;
const MAXIMUM_SIZE_R2_OBJECT = 5 * 1000 * 1000 * 1000;
// If layer surpasses the maximum size of an R2 upload, we need to calculate the digest
// stream and create a reference from the blobs path to the
// upload path. In R2, moving objects mean copying the stream, which
// doesn't really work if it's above 5GB due to R2 limits.
if (obj.size >= MAXIMUM_SIZE_R2_OBJECT) {
const compressionStream = new crypto.DigestStream("SHA-256");
obj.body.pipeTo(compressionStream);
const digest = hexToDigest(await compressionStream.digest);
if (digest !== expectedSha) {
return { response: new RegistryResponseJSON(JSON.stringify(DigestInvalid(expectedSha, digest))) };
}

const [, err] = await wrap(
this.env.REGISTRY.put(target, uuid, {
customMetadata: {
[referenceHeader]: uuid,
[digestHeaderInReference]: digest,
},
}),
);
if (err !== null) {
console.error("error uploading reference blob", errorString(err));
await this.env.REGISTRY.delete(uuid);
return {
response: new InternalError(),
};
}
} else {
const put = this.env.REGISTRY.put(target, obj!.body, {
sha256: (expectedSha as string).slice(SHA256_PREFIX_LEN),
});
const [, err] = await wrap(put);
await this.env.REGISTRY.delete(uuid);
if (err !== null) {
console.error("error uploading blob", errorString(err));
return {
response: new InternalError(),
};
}
}
}

await this.env.REGISTRY.delete(getRegistryUploadsPath(state));
Expand Down
1 change: 1 addition & 0 deletions src/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ v2Router.put("/:name+/blobs/uploads/:uuid", async (req, env: Env) => {
);

if (err) {
console.error("Error uploading manifest", errorString(err))
return new InternalError();
}

Expand Down
35 changes: 35 additions & 0 deletions src/v2-responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,38 @@ export const ManifestTagsListTooBigError = {
},
],
};

export class RegistryResponse extends Response {
constructor(body?: BodyInit | null, init?: ResponseInit) {
super(body, {
...init,
headers: {
...init?.headers,
"Docker-Distribution-Api-Version": "registry/2.0",
},
});
}
}
export class RegistryResponseJSON extends RegistryResponse {
constructor(body?: BodyInit | null, init?: ResponseInit) {
super(body, {
...init,
headers: {
...init?.headers,
"Content-Type": "application/json",
},
});
}
}
export const DigestInvalid = (expected: string, got: string) => ({
errors: [
{
code: "DIGEST_INVALID",
message: "digests don't match",
detail: {
Expected: expected,
Got: got,
},
},
],
});
5 changes: 4 additions & 1 deletion test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ async function generateManifest(name: string, schemaVersion: 1 | 2 = 2): Promise
const res2 = await fetch(createRequest("PATCH", res.headers.get("location")!, stream, {}));
expect(res2.ok).toBeTruthy();
const last = await fetch(createRequest("PUT", res2.headers.get("location")! + "&digest=" + sha256, null, {}));
expect(last.ok).toBeTruthy();
if (!last.ok) {
throw new Error(await last.text());
}

return schemaVersion === 1
? {
schemaVersion,
Expand Down

0 comments on commit f7de24d

Please sign in to comment.