Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Garbage collector #48

Merged
merged 5 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ jobs:
- name: Install pnpm
uses: pnpm/action-setup@v4
with:
version: 9
version: 9.9
- name: Use Node
uses: actions/setup-node@v4
with:
node-version: ${{ matrix.node-version }}
cache: 'pnpm'
cache: "pnpm"

- run: pnpm install --frozen-lockfile --child-concurrency=10
- run: cp wrangler.toml.example wrangler.toml
Expand Down
2 changes: 1 addition & 1 deletion index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export default {
return new InternalError();
}
},
};
} satisfies ExportedHandler<Env>;

const ensureConfig = (env: Env): boolean => {
if (!env.REGISTRY) {
Expand Down
20 changes: 10 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"deploy": "wrangler publish",
"dev:miniflare": "cross-env NODE_ENV=development wrangler --env dev dev --port 9999 --live-reload",
"typecheck": "tsc",
"test": "cross-env NODE_OPTIONS=--experimental-vm-modules vitest run"
"test": "vitest --config test/vitest.config.ts run"
},
"dependencies": {
"@cfworker/base64url": "^1.12.5",
Expand All @@ -16,23 +16,23 @@
"zod": "^3.22.4"
},
"devDependencies": {
"@cloudflare/vitest-pool-workers": "^0.5.7",
"@cloudflare/workers-types": "^4.20240614.0",
"cross-env": "^7.0.3",
"eslint": "^8.57.0",
"miniflare": "3.20240208.0",
"miniflare": "3.20240909.4",
"typescript": "^5.3.3",
"vitest": "^1.3.1",
"vitest-environment-miniflare": "^2.14.2",
"wrangler": "^3.61.0"
"vitest": "^2.1.0",
"wrangler": "^3.78.7"
},
"engines": {
"node": ">=18"
},
"author": "",
"license": "Apache-2.0",
"pnpm": {
"overrides": {
"@types/node": "18.15.3"
}
}
"pnpm": {
"overrides": {
"@types/node": "18.15.3"
}
}
}
1,076 changes: 272 additions & 804 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion src/chunk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ export function limit(streamInput: ReadableStream, limitBytes: number): Readable
r.releaseLock();
w.releaseLock();
await stream.writable.close();
await stream.readable.cancel();
})();

return stream.readable;
Expand Down
24 changes: 24 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,30 @@ export class InternalError extends Response {
}
}

export class ManifestError extends Response {
constructor(
code: "MANIFEST_INVALID" | "BLOB_UNKNOWN" | "MANIFEST_UNVERIFIED" | "TAG_INVALID" | "NAME_INVALID",
message: string,
detail: Record<string, string> = {},
) {
const jsonBody = JSON.stringify({
errors: [
{
code,
message,
detail,
},
],
});
super(jsonBody, {
status: 400,
headers: {
"content-type": "application/json;charset=UTF-8",
},
});
}
}

export class ServerError extends Response {
constructor(message: string, errorCode = 500) {
super(JSON.stringify({ errors: [{ code: "SERVER_ERROR", message, detail: null }] }), {
Expand Down
31 changes: 31 additions & 0 deletions src/manifest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { z } from "zod";

// https://github.com/opencontainers/image-spec/blob/main/manifest.md
export const manifestSchema = z.object({
schemaVersion: z.literal(2),
artifactType: z.string().optional(),
// to maintain retrocompatibility of the registry, let's not assume mediaTypes
mediaType: z.string(),
config: z.object({
mediaType: z.string(),
digest: z.string(),
size: z.number().int(),
}),
layers: z.array(
z.object({
size: z.number().int(),
mediaType: z.string(),
digest: z.string(),
}),
),
annotations: z.record(z.string()).optional(),
subject: z
.object({
mediaType: z.string(),
digest: z.string(),
size: z.number().int(),
})
.optional(),
});

export type ManifestSchema = z.infer<typeof manifestSchema>;
169 changes: 169 additions & 0 deletions src/registry/garbage-collector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// We have 2 modes for the garbage collector, unreferenced and untagged.
// Unreferenced will delete all blobs that are not referenced by any manifest.
// Untagged will delete all blobs that are not referenced by any manifest and are not tagged.

import { ManifestSchema } from "../manifest";

export type GarbageCollectionMode = "unreferenced" | "untagged";
export type GCOptions = {
name: string;
mode: GarbageCollectionMode;
};

// The garbage collector checks for dangling layers in the namespace. It's a lock free
// GC, but on-conflict (when there is an ongoing manifest insertion, or an ongoing garbage collection),
// the methods can throw errors.
//
// Summary:
// insertParent() {
// mark = updateInsertMark(); // mark insertion
// defer cleanInsertMark(mark);
// checkEveryChildIsOK();
// getDeletionMarkIsFalse(); // make sure not ongoing deletion mark after checking child is in db
// insertParent(); // insert parent in db
// }
//
// gc() {
// setDeletionMark() // marks deletion as gc
// defer { cleanDeletionMark(); } // clean up mark
// checkNotOngoingInsertMark() // makes sure not ongoing updateInsertMark
// deleteChildrenWithoutParent(); // go ahead and clean children
// }
//
// This makes it so: after every layer is OK we can proceed and insert the manifest, as there is no ongoing GC
// In the GC code, if there is an insertion on-going, there is an error.
export class GarbageCollector {
private registry: R2Bucket;

constructor(registry: R2Bucket) {
this.registry = registry;
}

async markForGarbageCollection(namespace: string): Promise<string> {
const etag = crypto.randomUUID();
const deletion = await this.registry.put(`${namespace}/deletion`, etag);
if (deletion === null) throw new Error("unreachable");
return etag;
}

async cleanupGarbageCollectionMark(namespace: string) {
await this.registry.delete(`${namespace}/deletion`);
}

async checkCanInsertData(namespace: string): Promise<boolean> {
const deletion = await this.registry.head(`${namespace}/deletion`);
if (deletion === null) {
return true;
}

return false;
}

// If successful, it inserted in R2 that its going
// to start inserting data that might conflight with GC.
async markForInsertion(namespace: string): Promise<string> {
const uid = crypto.randomUUID();
const deletion = await this.registry.put(`${namespace}/insertion/${uid}`, uid);
if (deletion === null) throw new Error("unreachable");
return uid;
}

async cleanInsertion(namespace: string, tag: string) {
await this.registry.delete(`${namespace}/insertion/${tag}`);
}

async checkIfGCCanContinue(namespace: string): Promise<boolean> {
const objects = await this.registry.list({ prefix: `${namespace}/insertion` });
for (const object of objects.objects) {
if (object.uploaded.getTime() + 1000 * 60 <= Date.now()) {
await this.registry.delete(object.key);
} else {
return false;
}
}

// call again to clean more
if (objects.truncated) return false;
return true;
}

private async list(prefix: string, callback: (object: R2Object) => Promise<boolean>): Promise<boolean> {
const listed = await this.registry.list({ prefix });
for (const object of listed.objects) {
if ((await callback(object)) === false) {
return false;
}
}

let truncated = listed.truncated;
let cursor = listed.truncated ? listed.cursor : undefined;

while (truncated) {
const next = await this.registry.list({ prefix, cursor });
for (const object of next.objects) {
if ((await callback(object)) === false) {
return false;
}
}
truncated = next.truncated;
cursor = truncated ? cursor : undefined;
}
return true;
}

async collect(options: GCOptions): Promise<boolean> {
await this.markForGarbageCollection(options.name);
try {
return await this.collectInner(options);
} finally {
// if this fails, user can always call a custom endpoint to clean it up
await this.cleanupGarbageCollectionMark(options.name);
}
}

private async collectInner(options: GCOptions): Promise<boolean> {
// We can run out of memory, this should be a bloom filter
let referencedBlobs = new Set<string>();

await this.list(`${options.name}/manifests/`, async (manifestObject) => {
const tag = manifestObject.key.split("/").pop();
if (!tag || (options.mode === "untagged" && tag.startsWith("sha256:"))) {
return true;
}
const manifest = await this.registry.get(manifestObject.key);
if (!manifest) {
return true;
}

const manifestData = (await manifest.json()) as ManifestSchema;
manifestData.layers.forEach((layer) => {
referencedBlobs.add(layer.digest);
});

return true;
});

let unreferencedKeys: string[] = [];
const deleteThreshold = 15;
await this.list(`${options.name}/blobs/`, async (object) => {
if (!(await this.checkIfGCCanContinue(options.name))) {
throw new Error("there is a manifest insertion going, the garbage collection shall stop");
}

const hash = object.key.split("/").pop();
if (hash && !referencedBlobs.has(hash)) {
unreferencedKeys.push(object.key);
if (unreferencedKeys.length > deleteThreshold) {
await this.registry.delete(unreferencedKeys);
unreferencedKeys = [];
}
}
return true;
});
if (unreferencedKeys.length > 0) {
await this.registry.delete(unreferencedKeys);
}

return true;
}
}
5 changes: 5 additions & 0 deletions src/registry/http.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Env } from "../..";
import { InternalError } from "../errors";
import { errorString } from "../utils";
import { GarbageCollectionMode } from "./garbage-collector";
import {
CheckLayerResponse,
CheckManifestResponse,
Expand Down Expand Up @@ -473,6 +474,10 @@ export class RegistryHTTPClient implements Registry {
async listRepositories(_limit?: number, _last?: string): Promise<RegistryError | ListRepositoriesResponse> {
throw new Error("unimplemented");
}

garbageCollection(_namespace: string, _mode: GarbageCollectionMode): Promise<boolean> {
throw new Error("unimplemented");
}
}

// AuthType defined the supported auth types
Expand Down
Loading