diff --git a/packages/miniflare/src/plugins/r2/index.ts b/packages/miniflare/src/plugins/r2/index.ts index 570a07099..d92fc8331 100644 --- a/packages/miniflare/src/plugins/r2/index.ts +++ b/packages/miniflare/src/plugins/r2/index.ts @@ -1,15 +1,22 @@ +import fs from "fs/promises"; +import SCRIPT_R2_BUCKET_OBJECT from "worker:r2/bucket"; import { z } from "zod"; -import { Service, Worker_Binding } from "../../runtime"; +import { + Service, + Worker_Binding, + Worker_Binding_DurableObjectNamespaceDesignator, +} from "../../runtime"; +import { SharedBindings } from "../../workers"; import { PersistenceSchema, Plugin, + getPersistPath, kProxyNodeBinding, + migrateDatabase, namespaceEntries, namespaceKeys, - pluginNamespacePersistWorker, + objectEntryWorker, } from "../shared"; -import { R2Gateway } from "./gateway"; -import { R2Router } from "./router"; export const R2OptionsSchema = z.object({ r2Buckets: z.union([z.record(z.string()), z.string().array()]).optional(), @@ -19,41 +26,82 @@ export const R2SharedOptionsSchema = z.object({ }); export const R2_PLUGIN_NAME = "r2"; +const R2_STORAGE_SERVICE_NAME = `${R2_PLUGIN_NAME}:storage`; +const R2_BUCKET_SERVICE_PREFIX = `${R2_PLUGIN_NAME}:bucket`; +const R2_BUCKET_OBJECT_CLASS_NAME = "R2BucketObject"; +const R2_BUCKET_OBJECT: Worker_Binding_DurableObjectNamespaceDesignator = { + serviceName: R2_BUCKET_SERVICE_PREFIX, + className: R2_BUCKET_OBJECT_CLASS_NAME, +}; + export const R2_PLUGIN: Plugin< typeof R2OptionsSchema, - typeof R2SharedOptionsSchema, - R2Gateway + typeof R2SharedOptionsSchema > = { - gateway: R2Gateway, - router: R2Router, options: R2OptionsSchema, sharedOptions: R2SharedOptionsSchema, getBindings(options) { const buckets = namespaceEntries(options.r2Buckets); return buckets.map(([name, id]) => ({ name, - r2Bucket: { name: `${R2_PLUGIN_NAME}:${id}` }, + r2Bucket: { name: `${R2_BUCKET_SERVICE_PREFIX}:${id}` }, })); }, getNodeBindings(options) { const buckets = namespaceKeys(options.r2Buckets); return Object.fromEntries(buckets.map((name) => [name, kProxyNodeBinding])); }, - getServices({ options, sharedOptions }) { + async getServices({ options, sharedOptions, tmpPath, log }) { const persist = sharedOptions.r2Persist; const buckets = namespaceEntries(options.r2Buckets); - return buckets.map(([_, id]) => ({ - name: `${R2_PLUGIN_NAME}:${id}`, - worker: pluginNamespacePersistWorker( - R2_PLUGIN_NAME, - encodeURIComponent(id), - persist - ), + const services = buckets.map(([_, id]) => ({ + name: `${R2_BUCKET_SERVICE_PREFIX}:${id}`, + worker: objectEntryWorker(R2_BUCKET_OBJECT, id), })); + + if (buckets.length > 0) { + const uniqueKey = `miniflare-${R2_BUCKET_OBJECT_CLASS_NAME}`; + const persistPath = getPersistPath(R2_PLUGIN_NAME, tmpPath, persist); + await fs.mkdir(persistPath, { recursive: true }); + const storageService: Service = { + name: R2_STORAGE_SERVICE_NAME, + disk: { path: persistPath, writable: true }, + }; + const objectService: Service = { + name: R2_BUCKET_SERVICE_PREFIX, + worker: { + compatibilityDate: "2023-07-24", + compatibilityFlags: ["nodejs_compat", "experimental"], + modules: [ + { + name: "bucket.worker.js", + esModule: SCRIPT_R2_BUCKET_OBJECT(), + }, + ], + durableObjectNamespaces: [ + { + className: R2_BUCKET_OBJECT_CLASS_NAME, + uniqueKey, + }, + ], + // Store Durable Object SQL databases in persist path + durableObjectStorage: { localDisk: R2_STORAGE_SERVICE_NAME }, + // Bind blob disk directory service to object + bindings: [ + { + name: SharedBindings.MAYBE_SERVICE_BLOBS, + service: { name: R2_STORAGE_SERVICE_NAME }, + }, + ], + }, + }; + services.push(storageService, objectService); + + for (const bucket of buckets) { + await migrateDatabase(log, uniqueKey, persistPath, bucket[1]); + } + } + + return services; }, }; - -export * from "./r2Object"; -export * from "./gateway"; -export * from "./schemas"; -export { _testR2Conditional } from "./validator"; diff --git a/packages/miniflare/src/plugins/r2/router.ts b/packages/miniflare/src/plugins/r2/router.ts deleted file mode 100644 index 74f70d196..000000000 --- a/packages/miniflare/src/plugins/r2/router.ts +++ /dev/null @@ -1,156 +0,0 @@ -import assert from "assert"; -import { ReadableStream } from "stream/web"; -import { Request, Response } from "../../http"; -import { readPrefix } from "../../shared"; -import { - CfHeader, - GET, - PUT, - RouteHandler, - Router, - decodePersist, -} from "../shared"; -import { InternalError, InvalidMetadata } from "./errors"; -import { R2Gateway } from "./gateway"; -import { - EncodedMetadata, - InternalR2Object, - InternalR2ObjectBody, - InternalR2Objects, -} from "./r2Object"; -import { R2BindingRequestSchema } from "./schemas"; - -async function decodeMetadata(req: Request) { - const metadataSize = Number(req.headers.get(CfHeader.MetadataSize)); - if (Number.isNaN(metadataSize)) throw new InvalidMetadata(); - - assert(req.body !== null); - const body = req.body as ReadableStream; - - // Read just metadata from body stream - const [metadataBuffer, value] = await readPrefix(body, metadataSize); - const metadataJson = metadataBuffer.toString(); - const metadata = R2BindingRequestSchema.parse(JSON.parse(metadataJson)); - - return { metadata, metadataSize, value }; -} -function decodeHeaderMetadata(req: Request) { - const header = req.headers.get(CfHeader.Request); - if (header === null) throw new InvalidMetadata(); - return R2BindingRequestSchema.parse(JSON.parse(header)); -} - -function encodeResult( - result: InternalR2Object | InternalR2ObjectBody | InternalR2Objects -) { - let encoded: EncodedMetadata; - if (result instanceof InternalR2Object) { - encoded = result.encode(); - } else { - encoded = InternalR2Object.encodeMultiple(result); - } - - return new Response(encoded.value, { - headers: { - [CfHeader.MetadataSize]: `${encoded.metadataSize}`, - "Content-Type": "application/json", - }, - }); -} - -function encodeJSONResult(result: unknown) { - const encoded = JSON.stringify(result); - return new Response(encoded, { - headers: { - [CfHeader.MetadataSize]: `${Buffer.byteLength(encoded)}`, - "Content-Type": "application/json", - }, - }); -} - -export interface R2Params { - bucket: string; -} - -export class R2Router extends Router { - @GET("/:bucket") - get: RouteHandler = async (req, params) => { - const metadata = decodeHeaderMetadata(req); - const persist = decodePersist(req.headers); - const bucket = decodeURIComponent(params.bucket); - const gateway = this.gatewayFactory.get(bucket, persist); - - let result: InternalR2Object | InternalR2ObjectBody | InternalR2Objects; - if (metadata.method === "head") { - result = await gateway.head(metadata.object); - } else if (metadata.method === "get") { - result = await gateway.get(metadata.object, metadata); - } else if (metadata.method === "list") { - result = await gateway.list(metadata); - } else { - throw new InternalError(); - } - - return encodeResult(result); - }; - - @PUT("/:bucket") - put: RouteHandler = async (req, params) => { - const { metadata, metadataSize, value } = await decodeMetadata(req); - const persist = decodePersist(req.headers); - const bucket = decodeURIComponent(params.bucket); - const gateway = this.gatewayFactory.get(bucket, persist); - - if (metadata.method === "delete") { - await gateway.delete( - "object" in metadata ? metadata.object : metadata.objects - ); - return new Response(); - } else if (metadata.method === "put") { - const contentLength = Number(req.headers.get("Content-Length")); - // `workerd` requires a known value size for R2 put requests: - // - https://github.com/cloudflare/workerd/blob/e3479895a2ace28e4fd5f1399cea4c92291966ab/src/workerd/api/r2-rpc.c%2B%2B#L154-L156 - // - https://github.com/cloudflare/workerd/blob/e3479895a2ace28e4fd5f1399cea4c92291966ab/src/workerd/api/r2-rpc.c%2B%2B#L188-L189 - assert(!isNaN(contentLength)); - const valueSize = contentLength - metadataSize; - const result = await gateway.put( - metadata.object, - value, - valueSize, - metadata - ); - return encodeResult(result); - } else if (metadata.method === "createMultipartUpload") { - const result = await gateway.createMultipartUpload( - metadata.object, - metadata - ); - return encodeJSONResult(result); - } else if (metadata.method === "uploadPart") { - const contentLength = Number(req.headers.get("Content-Length")); - // `workerd` requires a known value size for R2 put requests as above - assert(!isNaN(contentLength)); - const valueSize = contentLength - metadataSize; - const result = await gateway.uploadPart( - metadata.object, - metadata.uploadId, - metadata.partNumber, - value, - valueSize - ); - return encodeJSONResult(result); - } else if (metadata.method === "completeMultipartUpload") { - const result = await gateway.completeMultipartUpload( - metadata.object, - metadata.uploadId, - metadata.parts - ); - return encodeResult(result); - } else if (metadata.method === "abortMultipartUpload") { - await gateway.abortMultipartUpload(metadata.object, metadata.uploadId); - return new Response(); - } else { - throw new InternalError(); // Unknown method: should never be reached - } - }; -} diff --git a/packages/miniflare/src/plugins/r2/gateway.ts b/packages/miniflare/src/workers/r2/bucket.worker.ts similarity index 65% rename from packages/miniflare/src/plugins/r2/gateway.ts rename to packages/miniflare/src/workers/r2/bucket.worker.ts index 89f5a8fdb..d5ee4a435 100644 --- a/packages/miniflare/src/plugins/r2/gateway.ts +++ b/packages/miniflare/src/workers/r2/bucket.worker.ts @@ -1,37 +1,43 @@ -import assert from "assert"; -import crypto from "crypto"; -import { ReadableStream, TransformStream } from "stream/web"; +import assert from "node:assert"; +import { Buffer } from "node:buffer"; +import { createHash } from "node:crypto"; import { + BlobId, DeferredPromise, - Log, - Timers, + GET, + InclusiveRange, + MiniflareDurableObject, + MiniflareDurableObjectEnv, + PUT, + RouteHandler, + TransactionFactory, + TypedSqlStorage, WaitGroup, + all, base64Decode, base64Encode, - maybeApply, - prefixError, -} from "../../shared"; -import { - BlobId, - InclusiveRange, - Storage, - TypedDatabase, escapeLike, -} from "../../storage"; + get, + maybeApply, + readPrefix, +} from "miniflare:shared"; +import { R2Headers, R2Limits } from "./constants"; import { BadUpload, EntityTooSmall, InternalError, + InvalidMetadata, InvalidPart, NoSuchKey, NoSuchUpload, PreconditionFailed, -} from "./errors"; +} from "./errors.worker"; import { + EncodedMetadata, InternalR2Object, InternalR2ObjectBody, InternalR2Objects, -} from "./r2Object"; +} from "./r2Object.worker"; import { InternalR2CreateMultipartUploadOptions, InternalR2GetOptions, @@ -41,19 +47,18 @@ import { MultipartUploadRow, MultipartUploadState, ObjectRow, + R2BindingRequestSchema, R2Conditional, R2CreateMultipartUploadResponse, R2PublishedPart, - R2Range, R2UploadPartResponse, SQL_SCHEMA, -} from "./schemas"; +} from "./schemas.worker"; import { - MAX_LIST_KEYS, - R2Hashes, + DigestAlgorithm, R2_HASH_ALGORITHMS, Validator, -} from "./validator"; +} from "./validator.worker"; // This file implements Miniflare's R2 simulator, supporting both single and // multipart uploads. @@ -91,22 +96,27 @@ import { // uploads. class DigestingStream< - Algorithm extends string = string + Algorithm extends DigestAlgorithm = DigestAlgorithm > extends TransformStream { readonly digests: Promise>; constructor(algorithms: Algorithm[]) { const digests = new DeferredPromise>(); - const hashes = algorithms.map((alg) => crypto.createHash(alg)); + const hashes = algorithms.map((alg) => { + const stream = new crypto.DigestStream(alg); + const writer = stream.getWriter(); + return { stream, writer }; + }); super({ - transform(chunk, controller) { - for (const hash of hashes) hash.update(chunk); + async transform(chunk, controller) { + for (const hash of hashes) await hash.writer.write(chunk); controller.enqueue(chunk); }, - flush() { + async flush() { const result = new Map(); for (let i = 0; i < hashes.length; i++) { - result.set(algorithms[i], hashes[i].digest()); + await hashes[i].writer.close(); + result.set(algorithms[i], Buffer.from(await hashes[i].stream.digest)); } digests.resolve(result); }, @@ -116,17 +126,22 @@ class DigestingStream< } const validate = new Validator(); +const decoder = new TextDecoder(); function generateVersion() { - return crypto.randomBytes(16).toString("hex"); + return Buffer.from(crypto.getRandomValues(new Uint8Array(16))).toString( + "hex" + ); } function generateId() { - return crypto.randomBytes(128).toString("base64url"); + return Buffer.from(crypto.getRandomValues(new Uint8Array(128))).toString( + "base64url" + ); } function generateMultipartEtag(md5Hexes: string[]) { // https://stackoverflow.com/a/19896823 - const hash = crypto.createHash("md5"); - for (const md5Hex of md5Hexes) hash.update(Buffer.from(md5Hex, "hex")); + const hash = createHash("md5"); + for (const md5Hex of md5Hexes) hash.update(md5Hex, "hex"); return `${hash.digest("hex")}-${md5Hexes.length}`; } @@ -134,24 +149,83 @@ function rangeOverlaps(a: InclusiveRange, b: InclusiveRange): boolean { return a.start <= b.end && b.start <= a.end; } -function sqlStmts(db: TypedDatabase) { +async function decodeMetadata(req: Request) { + // Safety of `!`: `parseInt(null)` is `NaN` + const metadataSize = parseInt(req.headers.get(R2Headers.METADATA_SIZE)!); + if (Number.isNaN(metadataSize)) throw new InvalidMetadata(); + + assert(req.body !== null); + const body = req.body as ReadableStream; + + // Read just metadata from body stream + const [metadataBuffer, value] = await readPrefix(body, metadataSize); + const metadataJson = decoder.decode(metadataBuffer); + const metadata = R2BindingRequestSchema.parse(JSON.parse(metadataJson)); + + return { metadata, metadataSize, value }; +} +function decodeHeaderMetadata(req: Request) { + const header = req.headers.get(R2Headers.REQUEST); + if (header === null) throw new InvalidMetadata(); + return R2BindingRequestSchema.parse(JSON.parse(header)); +} + +function encodeResult( + result: InternalR2Object | InternalR2ObjectBody | InternalR2Objects +) { + let encoded: EncodedMetadata; + if (result instanceof InternalR2Object) { + encoded = result.encode(); + } else { + encoded = InternalR2Object.encodeMultiple(result); + } + + return new Response(encoded.value, { + headers: { + [R2Headers.METADATA_SIZE]: `${encoded.metadataSize}`, + "Content-Type": "application/json", + }, + }); +} +function encodeJSONResult(result: unknown) { + const encoded = JSON.stringify(result); + return new Response(encoded, { + headers: { + [R2Headers.METADATA_SIZE]: `${Buffer.byteLength(encoded)}`, + "Content-Type": "application/json", + }, + }); +} + +function sqlStmts(db: TypedSqlStorage, txn: TransactionFactory) { const stmtGetPreviousByKey = db.prepare< - Pick, + [key_1: string], Pick - >("SELECT blob_id, etag, uploaded FROM _mf_objects WHERE key = :key"); + >("SELECT blob_id, etag, uploaded FROM _mf_objects WHERE key = ?1"); // Regular statements - const stmtGetByKey = db.prepare, ObjectRow>(` + const stmtGetByKey = db.prepare<[key_1: string], ObjectRow>(` SELECT key, blob_id, version, size, etag, uploaded, checksums, http_metadata, custom_metadata - FROM _mf_objects WHERE key = :key + FROM _mf_objects WHERE key = ?1 `); - const stmtPut = db.prepare(` + const stmtPut = db.prepare< + [ + key_1: string, + blob_id_2: string | null, + version_3: string, + size_4: number, + etag_5: string, + uploaded_6: number, + checksums_7: string, + http_metadata_8: string, + custom_metadata_9: string + ] + >(` INSERT OR REPLACE INTO _mf_objects (key, blob_id, version, size, etag, uploaded, checksums, http_metadata, custom_metadata) - VALUES (:key, :blob_id, :version, :size, :etag, :uploaded, :checksums, :http_metadata, :custom_metadata) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9) `); - const stmtDelete = db.prepare< - Pick, - Pick - >("DELETE FROM _mf_objects WHERE key = :key RETURNING blob_id"); + const stmtDelete = db.prepare<[key_1: string], Pick>( + "DELETE FROM _mf_objects WHERE key = ?1 RETURNING blob_id" + ); function stmtListWithoutDelimiter( ...extraColumns: ExtraColumns @@ -167,136 +241,155 @@ function sqlStmts(db: TypedDatabase) { ]; // TODO: consider applying same `:start_after IS NULL` trick to KeyValueStore return db.prepare< - { limit: number; escaped_prefix: string; start_after: string | null }, + [limit_1: number, escaped_prefix_2: string, start_after_3: string | null], Omit & Pick >(` SELECT ${columns.join(", ")} FROM _mf_objects - WHERE key LIKE :escaped_prefix || '%' ESCAPE '\\' - AND (:start_after IS NULL OR key > :start_after) - ORDER BY key LIMIT :limit + WHERE key LIKE ?2 || '%' ESCAPE '\\' + AND (?3 IS NULL OR key > ?3) + ORDER BY key LIMIT ?1 `); } // Multipart upload statements const stmtGetUploadState = db.prepare< - Pick, + [upload_id_1: string, key_2: string], Pick >( // For checking current upload state - "SELECT state FROM _mf_multipart_uploads WHERE upload_id = :upload_id AND key = :key" + "SELECT state FROM _mf_multipart_uploads WHERE upload_id = ?1 AND key = ?2" ); const stmtGetUploadMetadata = db.prepare< - Pick, + [upload_id_1: string, key_2: string], Pick >( // For checking current upload state, and getting metadata for completion - "SELECT http_metadata, custom_metadata, state FROM _mf_multipart_uploads WHERE upload_id = :upload_id AND key = :key" + "SELECT http_metadata, custom_metadata, state FROM _mf_multipart_uploads WHERE upload_id = ?1 AND key = ?2" ); const stmtUpdateUploadState = db.prepare< - Pick + [upload_id_1: string, state_2: MultipartUploadRow["state"]] >( // For completing/aborting uploads - "UPDATE _mf_multipart_uploads SET state = :state WHERE upload_id = :upload_id" + "UPDATE _mf_multipart_uploads SET state = ?2 WHERE upload_id = ?1" ); // Multipart part statements const stmtGetPreviousPartByNumber = db.prepare< - Pick, + [upload_id_1: string, part_number_2: number], Pick >( // For getting part number's previous blob ID to garbage collect - "SELECT blob_id FROM _mf_multipart_parts WHERE upload_id = :upload_id AND part_number = :part_number" + "SELECT blob_id FROM _mf_multipart_parts WHERE upload_id = ?1 AND part_number = ?2" ); - const stmtPutPart = db.prepare>( + const stmtPutPart = db.prepare< + [ + upload_id_1: string, + part_number_2: number, + blob_id_3: BlobId, + size_4: number, + etag_5: string, + checksum_md5_6: string + ] + >( // For recording metadata when uploading parts `INSERT OR REPLACE INTO _mf_multipart_parts (upload_id, part_number, blob_id, size, etag, checksum_md5) - VALUES (:upload_id, :part_number, :blob_id, :size, :etag, :checksum_md5)` + VALUES (?1, ?2, ?3, ?4, ?5, ?6)` ); const stmtLinkPart = db.prepare< - Pick + [upload_id_1: string, part_number_2: number, object_key_3: string] >( // For linking parts with an object when completing uploads - `UPDATE _mf_multipart_parts SET object_key = :object_key - WHERE upload_id = :upload_id AND part_number = :part_number` + `UPDATE _mf_multipart_parts SET object_key = ?3 + WHERE upload_id = ?1 AND part_number = ?2` ); const stmtDeletePartsByUploadId = db.prepare< - Pick, + [upload_id_1: string], Pick >( // For deleting parts when aborting uploads - "DELETE FROM _mf_multipart_parts WHERE upload_id = :upload_id RETURNING blob_id" + "DELETE FROM _mf_multipart_parts WHERE upload_id = ?1 RETURNING blob_id" ); const stmtDeleteUnlinkedPartsByUploadId = db.prepare< - Pick, + [upload_id_1: string], Pick >( // For deleting unused parts when completing uploads - "DELETE FROM _mf_multipart_parts WHERE upload_id = :upload_id AND object_key IS NULL RETURNING blob_id" + "DELETE FROM _mf_multipart_parts WHERE upload_id = ?1 AND object_key IS NULL RETURNING blob_id" ); const stmtDeletePartsByKey = db.prepare< - Pick, + [object_key_1: string], Pick >( // For deleting dangling parts when overwriting an existing key - "DELETE FROM _mf_multipart_parts WHERE object_key = :object_key RETURNING blob_id" + "DELETE FROM _mf_multipart_parts WHERE object_key = ?1 RETURNING blob_id" ); const stmtListPartsByUploadId = db.prepare< - Pick, + [upload_id_1: string], Omit >( // For getting part metadata when completing uploads `SELECT upload_id, part_number, blob_id, size, etag, checksum_md5, object_key - FROM _mf_multipart_parts WHERE upload_id = :upload_id` + FROM _mf_multipart_parts WHERE upload_id = ?1` ); const stmtListPartsByKey = db.prepare< - Pick, + [object_key_1: string], Pick >( // For getting part metadata when getting values, size included for range // requests, so we only need to read blobs containing the required data - "SELECT blob_id, size FROM _mf_multipart_parts WHERE object_key = :object_key ORDER BY part_number" + "SELECT blob_id, size FROM _mf_multipart_parts WHERE object_key = ?1 ORDER BY part_number" ); return { getByKey: stmtGetByKey, - getPartsByKey: db.transaction((key: string) => { - const row = stmtGetByKey.get({ key }); + getPartsByKey: txn((key: string) => { + const row = get(stmtGetByKey(key)); if (row === undefined) return; if (row.blob_id === null) { // If this is a multipart object, also return the parts - const partsRows = stmtListPartsByKey.all({ object_key: key }); + const partsRows = all(stmtListPartsByKey(key)); return { row, parts: partsRows }; } else { // Otherwise, just return the row return { row }; } }), - put: db.transaction((newRow: ObjectRow, onlyIf?: R2Conditional) => { + put: txn((newRow: ObjectRow, onlyIf?: R2Conditional) => { const key = newRow.key; - const row = stmtGetPreviousByKey.get({ key }); + const row = get(stmtGetPreviousByKey(key)); if (onlyIf !== undefined) validate.condition(row, onlyIf); - stmtPut.run(newRow); + stmtPut( + newRow.key, + newRow.blob_id, + newRow.version, + newRow.size, + newRow.etag, + newRow.uploaded, + newRow.checksums, + newRow.http_metadata, + newRow.custom_metadata + ); const maybeOldBlobId = row?.blob_id; if (maybeOldBlobId === undefined) { return []; } else if (maybeOldBlobId === null) { // If blob_id is null, this was a multipart object, so delete all // multipart parts - const rows = stmtDeletePartsByKey.all({ object_key: key }); + const rows = all(stmtDeletePartsByKey(key)); return rows.map(({ blob_id }) => blob_id); } else { return [maybeOldBlobId]; } }), - deleteByKeys: db.transaction((keys: string[]) => { + deleteByKeys: txn((keys: string[]) => { const oldBlobIds: string[] = []; for (const key of keys) { - const row = stmtDelete.get({ key }); + const row = get(stmtDelete(key)); const maybeOldBlobId = row?.blob_id; if (maybeOldBlobId === null) { // If blob_id is null, this was a multipart object, so delete all // multipart parts - const partRows = stmtDeletePartsByKey.all({ object_key: key }); + const partRows = stmtDeletePartsByKey(key); for (const partRow of partRows) oldBlobIds.push(partRow.blob_id); } else if (maybeOldBlobId !== undefined) { oldBlobIds.push(maybeOldBlobId); @@ -314,13 +407,13 @@ function sqlStmts(db: TypedDatabase) { "custom_metadata" ), listMetadata: db.prepare< - { - limit: number; - escaped_prefix: string; - start_after: string | null; - prefix: string; - delimiter: string; - }, + [ + limit_1: number, + escaped_prefix_2: string, + start_after_3: string | null, + prefix_4: string, + delimiter_5: string + ], Omit & { last_key: string; delimited_prefix_or_key: `dlp:${string}` | `key:${string}`; @@ -334,49 +427,64 @@ function sqlStmts(db: TypedDatabase) { max(key) AS last_key, iif( -- Try get 1-indexed position \`i\` of :delimiter in rest of key after :prefix... - instr(substr(key, length(:prefix) + 1), :delimiter), + instr(substr(key, length(?4) + 1), ?5), -- ...if found, we have a delimited prefix of the :prefix followed by the rest of key up to and including the :delimiter - 'dlp:' || substr(key, 1, length(:prefix) + instr(substr(key, length(:prefix) + 1), :delimiter) + length(:delimiter) - 1), + 'dlp:' || substr(key, 1, length(?4) + instr(substr(key, length(?4) + 1), ?5) + length(?5) - 1), -- ...otherwise, we just have a regular key 'key:' || key ) AS delimited_prefix_or_key, -- NOTE: we'll ignore metadata for delimited prefix rows, so it doesn't matter which keys' we return version, size, etag, uploaded, checksums, http_metadata, custom_metadata FROM _mf_objects - WHERE key LIKE :escaped_prefix || '%' ESCAPE '\\' - AND (:start_after IS NULL OR key > :start_after) - GROUP BY delimited_prefix_or_key -- Group keys with same delimited prefix into a row, leaving otherS in their own rows - ORDER BY last_key LIMIT :limit; + WHERE key LIKE ?2 || '%' ESCAPE '\\' + AND (?3 IS NULL OR key > ?3) + GROUP BY delimited_prefix_or_key -- Group keys with same delimited prefix into a row, leaving others in their own rows + ORDER BY last_key LIMIT ?1; `), - createMultipartUpload: db.prepare>(` + createMultipartUpload: db.prepare< + [ + upload_id_1: string, + key_2: string, + http_metadata_3: string, + custom_metadata_4: string + ] + >(` INSERT INTO _mf_multipart_uploads (upload_id, key, http_metadata, custom_metadata) - VALUES (:upload_id, :key, :http_metadata, :custom_metadata) + VALUES (?1, ?2, ?3, ?4) `), - putPart: db.transaction( + putPart: txn( (key: string, newRow: Omit) => { // 1. Check the upload exists and is in-progress - const uploadRow = stmtGetUploadState.get({ - key, - upload_id: newRow.upload_id, - }); + const uploadRow = get(stmtGetUploadState(newRow.upload_id, key)); if (uploadRow?.state !== MultipartUploadState.IN_PROGRESS) { throw new NoSuchUpload(); } // 2. Check if we have an existing part with this number, then upsert - const partRow = stmtGetPreviousPartByNumber.get({ - upload_id: newRow.upload_id, - part_number: newRow.part_number, - }); - stmtPutPart.run(newRow); + const partRow = get( + stmtGetPreviousPartByNumber(newRow.upload_id, newRow.part_number) + ); + stmtPutPart( + newRow.upload_id, + newRow.part_number, + newRow.blob_id, + newRow.size, + newRow.etag, + newRow.checksum_md5 + ); return partRow?.blob_id; } ), - completeMultipartUpload: db.transaction( - (key: string, upload_id: string, selectedParts: R2PublishedPart[]) => { + completeMultipartUpload: txn( + ( + key: string, + upload_id: string, + selectedParts: R2PublishedPart[], + minPartSize: number + ) => { // 1. Check the upload exists and is in-progress - const uploadRow = stmtGetUploadMetadata.get({ key, upload_id }); + const uploadRow = get(stmtGetUploadMetadata(upload_id, key)); if (uploadRow === undefined) { throw new InternalError(); } else if (uploadRow.state > MultipartUploadState.IN_PROGRESS) { @@ -392,7 +500,7 @@ function sqlStmts(db: TypedDatabase) { // 3. Get metadata for all uploaded parts, checking all selected parts // exist - const uploadedPartRows = stmtListPartsByUploadId.all({ upload_id }); + const uploadedPartRows = stmtListPartsByUploadId(upload_id); const uploadedParts = new Map< /* part number */ number, Omit @@ -417,7 +525,7 @@ function sqlStmts(db: TypedDatabase) { // 4. Check all but last part meets minimum size requirements. First // check this in argument order, throwing a friendly error... for (const part of parts.slice(0, -1)) { - if (part.size < R2Gateway._MIN_MULTIPART_PART_SIZE) { + if (part.size < minPartSize) { throw new EntityTooSmall(); } } @@ -431,10 +539,7 @@ function sqlStmts(db: TypedDatabase) { for (const part of parts.slice(0, -1)) { // noinspection JSUnusedAssignment partSize ??= part.size; - if ( - part.size < R2Gateway._MIN_MULTIPART_PART_SIZE || - part.size !== partSize - ) { + if (part.size < minPartSize || part.size !== partSize) { throw new BadUpload(); } } @@ -446,12 +551,12 @@ function sqlStmts(db: TypedDatabase) { // 5. Get existing upload if any, and delete previous multipart parts const oldBlobIds: string[] = []; - const existingRow = stmtGetPreviousByKey.get({ key }); + const existingRow = get(stmtGetPreviousByKey(key)); const maybeOldBlobId = existingRow?.blob_id; if (maybeOldBlobId === null) { // If blob_id is null, this was a multipart object, so delete all // multipart parts - const partRows = stmtDeletePartsByKey.all({ object_key: key }); + const partRows = stmtDeletePartsByKey(key); for (const partRow of partRows) oldBlobIds.push(partRow.blob_id); } else if (maybeOldBlobId !== undefined) { oldBlobIds.push(maybeOldBlobId); @@ -473,31 +578,34 @@ function sqlStmts(db: TypedDatabase) { http_metadata: uploadRow.http_metadata, custom_metadata: uploadRow.custom_metadata, }; - stmtPut.run(newRow); + stmtPut( + newRow.key, + newRow.blob_id, + newRow.version, + newRow.size, + newRow.etag, + newRow.uploaded, + newRow.checksums, + newRow.http_metadata, + newRow.custom_metadata + ); for (const part of parts) { - stmtLinkPart.run({ - upload_id, - part_number: part.part_number, - object_key: key, - }); + stmtLinkPart(upload_id, part.part_number, key); } // 7. Delete unlinked, unused parts - const partRows = stmtDeleteUnlinkedPartsByUploadId.all({ upload_id }); + const partRows = stmtDeleteUnlinkedPartsByUploadId(upload_id); for (const partRow of partRows) oldBlobIds.push(partRow.blob_id); // 8. Mark the upload as completed - stmtUpdateUploadState.run({ - upload_id, - state: MultipartUploadState.COMPLETED, - }); + stmtUpdateUploadState(upload_id, MultipartUploadState.COMPLETED); return { newRow, oldBlobIds }; } ), - abortMultipartUpload: db.transaction((key: string, upload_id: string) => { + abortMultipartUpload: txn((key: string, upload_id: string) => { // 1. Make sure this multipart upload exists, ignoring finalised states - const uploadRow = stmtGetUploadState.get({ key, upload_id }); + const uploadRow = get(stmtGetUploadState(upload_id, key)); if (uploadRow === undefined) { throw new InternalError(); } else if (uploadRow.state > MultipartUploadState.IN_PROGRESS) { @@ -508,27 +616,19 @@ function sqlStmts(db: TypedDatabase) { } // 2. Delete all parts in the upload - const partRows = stmtDeletePartsByUploadId.all({ upload_id }); + const partRows = all(stmtDeletePartsByUploadId(upload_id)); const oldBlobIds = partRows.map(({ blob_id }) => blob_id); // 3. Mark the uploaded as aborted - stmtUpdateUploadState.run({ - upload_id, - state: MultipartUploadState.ABORTED, - }); + stmtUpdateUploadState(upload_id, MultipartUploadState.ABORTED); return oldBlobIds; }), }; } -export class R2Gateway { - // Minimum multipart part upload size is configurable, so we can use smaller - // values in tests - /** @internal */ - static _MIN_MULTIPART_PART_SIZE = 5 * 1024 * 1024; - - readonly #storage: Storage; +// noinspection JSUnusedGlobalSymbols +export class R2BucketObject extends MiniflareDurableObject { readonly #stmts: ReturnType; // Multipart uploads are stored as multiple blobs. Therefore, when reading a @@ -553,15 +653,11 @@ export class R2Gateway { // some inter-process signalling/subscription system. readonly #inUseBlobs = new Map(); - constructor( - private readonly log: Log, - storage: Storage, - private readonly timers: Timers - ) { - this.#storage = storage; - this.#storage.db.pragma("case_sensitive_like = TRUE"); - this.#storage.db.exec(SQL_SCHEMA); - this.#stmts = sqlStmts(this.#storage.db); + constructor(state: DurableObjectState, env: MiniflareDurableObjectEnv) { + super(state, env); + this.db.exec("PRAGMA case_sensitive_like = TRUE"); + this.db.exec(SQL_SCHEMA); + this.#stmts = sqlStmts(this.db, this.txn); } #acquireBlob(blobId: BlobId) { @@ -585,14 +681,13 @@ export class R2Gateway { this.timers.queueMicrotask(async () => { // Wait for all multipart gets using this blob to complete await this.#inUseBlobs.get(blobId)?.wait(); - return this.#storage.blob.delete(blobId).catch((e) => { - this.log.error(prefixError("Deleting Blob", e)); + return this.blob.delete(blobId).catch((e) => { + console.error("R2BucketObject##backgroundDelete():", e); }); }); } #assembleMultipartValue( - storage: Storage, parts: Pick[], queryRange: InclusiveRange ): ReadableStream { @@ -617,6 +712,13 @@ export class R2Gateway { // Stream required parts, the `Promise`s returned from `pipeTo()` won't // resolve until a reader starts reading, so run this in the background as // an async IIFE. + // + // NOTE: we can't use `IdentityTransformStream` here as piping the readable + // side of an `IdentityTransformStream` to the writable side of another + // `IdentityTransformStream` is not supported: + // https://github.com/cloudflare/workerd/blob/c6f439ca37c5fa34acc54a6df79214ae029ddf9f/src/workerd/api/streams/internal.c%2B%2B#L169 + // We'll be piping to an `IdentityTransformStream` when we encode the + // metadata followed by this stream as the response body. const identity = new TransformStream(); (async () => { let i = 0; @@ -627,7 +729,7 @@ export class R2Gateway { // that blob (and the rest) will be released in the `finally`. for (; i < requiredParts.length; i++) { const { blobId, range } = requiredParts[i]; - const value = await storage.blob.get(blobId, range); + const value = await this.blob.get(blobId, range); const msg = `Expected to find blob "${blobId}" for multipart value`; assert(value !== null, msg); await value.pipeTo(identity.writable, { preventClose: true }); @@ -635,7 +737,7 @@ export class R2Gateway { } await identity.writable.close(); } catch (e) { - identity.writable.abort(e); + await identity.writable.abort(e); } finally { for (; i < requiredParts.length; i++) { this.#releaseBlob(requiredParts[i].blobId); @@ -645,19 +747,19 @@ export class R2Gateway { return identity.readable; } - async head(key: string): Promise { + async #head(key: string): Promise { validate.key(key); - const row = this.#stmts.getByKey.get({ key }); + const row = get(this.#stmts.getByKey(key)); if (row === undefined) throw new NoSuchKey(); const range: R2Range = { offset: 0, length: row.size }; return new InternalR2Object(row, range); } - async get( + async #get( key: string, - options: InternalR2GetOptions = {} + opts: InternalR2GetOptions ): Promise { validate.key(key); @@ -669,7 +771,7 @@ export class R2Gateway { // Validate pre-condition const defaultR2Range: R2Range = { offset: 0, length: row.size }; try { - validate.condition(row, options.onlyIf); + validate.condition(row, opts.onlyIf); } catch (e) { if (e instanceof PreconditionFailed) { e.attach(new InternalR2Object(row, defaultR2Range)); @@ -678,7 +780,7 @@ export class R2Gateway { } // Validate range, and convert to R2 range for return - const range = validate.range(options, row.size); + const range = validate.range(opts, row.size); let r2Range: R2Range; if (range === undefined) { r2Range = defaultR2Range; @@ -693,45 +795,41 @@ export class R2Gateway { // If this is a multipart object, we should've fetched multipart parts assert(parts !== undefined); const defaultRange = { start: 0, end: row.size - 1 }; - value = this.#assembleMultipartValue( - this.#storage, - parts, - range ?? defaultRange - ); + value = this.#assembleMultipartValue(parts, range ?? defaultRange); } else { // Otherwise, just return a single part value - value = await this.#storage.blob.get(row.blob_id, range); + value = await this.blob.get(row.blob_id, range); if (value === null) throw new NoSuchKey(); } return new InternalR2ObjectBody(row, value, r2Range); } - async put( + async #put( key: string, value: ReadableStream, valueSize: number, - options: InternalR2PutOptions + opts: InternalR2PutOptions ): Promise { // Store value in the blob store, computing required digests as we go // (this means we don't have to buffer the entire stream to compute them) - const algorithms: (keyof R2Hashes)[] = []; - for (const { field } of R2_HASH_ALGORITHMS) { + const algorithms: DigestAlgorithm[] = []; + for (const { name, field } of R2_HASH_ALGORITHMS) { // Always compute MD5 digest - if (field === "md5" || field in options) algorithms.push(field); + if (field === "md5" || opts[field] !== undefined) algorithms.push(name); } const digesting = new DigestingStream(algorithms); - const blobId = await this.#storage.blob.put(value.pipeThrough(digesting)); + const blobId = await this.blob.put(value.pipeThrough(digesting)); const digests = await digesting.digests; - const md5Digest = digests.get("md5"); + const md5Digest = digests.get("MD5"); assert(md5Digest !== undefined); const md5DigestHex = md5Digest.toString("hex"); const checksums = validate .key(key) .size(valueSize) - .metadataSize(options.customMetadata) - .hash(digests, options); + .metadataSize(opts.customMetadata) + .hash(digests, opts); const row: ObjectRow = { key, blob_id: blobId, @@ -740,12 +838,12 @@ export class R2Gateway { etag: md5DigestHex, uploaded: Date.now(), checksums: JSON.stringify(checksums), - http_metadata: JSON.stringify(options.httpMetadata ?? {}), - custom_metadata: JSON.stringify(options.customMetadata ?? {}), + http_metadata: JSON.stringify(opts.httpMetadata ?? {}), + custom_metadata: JSON.stringify(opts.customMetadata ?? {}), }; let oldBlobIds: string[] | undefined; try { - oldBlobIds = this.#stmts.put(row, options.onlyIf); + oldBlobIds = this.#stmts.put(row, opts.onlyIf); } catch (e) { // Probably precondition failed. In any case, the put transaction failed, // so we're not storing a reference to the blob ID @@ -758,7 +856,7 @@ export class R2Gateway { return new InternalR2Object(row); } - async delete(keys: string | string[]) { + #delete(keys: string | string[]) { if (!Array.isArray(keys)) keys = [keys]; for (const key of keys) validate.key(key); const oldBlobIds = this.#stmts.deleteByKeys(keys); @@ -772,10 +870,10 @@ export class R2Gateway { return this.#stmts.listHttpCustomMetadataWithoutDelimiter; } - async list(opts: InternalR2ListOptions = {}): Promise { + async #list(opts: InternalR2ListOptions): Promise { const prefix = opts.prefix ?? ""; - let limit = opts.limit ?? MAX_LIST_KEYS; + let limit = opts.limit ?? R2Limits.MAX_LIST_KEYS; validate.limit(limit); // If metadata is requested, R2 may return fewer than `limit` results to @@ -828,11 +926,15 @@ export class R2Gateway { let nextCursorStartAfter: string | undefined; if (delimiter !== undefined) { - const rows = this.#stmts.listMetadata.all({ - ...params, - prefix, - delimiter, - }); + const rows = all( + this.#stmts.listMetadata( + params.limit, + params.escaped_prefix, + params.start_after, + prefix, + delimiter + ) + ); // If there are more results, we'll be returning a cursor const hasMoreRows = rows.length === limit + 1; @@ -851,7 +953,9 @@ export class R2Gateway { } else { // If we don't have a delimiter, we can use a more efficient query const query = this.#listWithoutDelimiterQuery(excludeHttp, excludeCustom); - const rows = query.all(params); + const rows = all( + query(params.limit, params.escaped_prefix, params.start_after) + ); // If there are more results, we'll be returning a cursor const hasMoreRows = rows.length === limit + 1; @@ -874,23 +978,23 @@ export class R2Gateway { }; } - async createMultipartUpload( + async #createMultipartUpload( key: string, opts: InternalR2CreateMultipartUploadOptions ): Promise { validate.key(key); const uploadId = generateId(); - this.#stmts.createMultipartUpload.run({ + this.#stmts.createMultipartUpload( + uploadId, key, - upload_id: uploadId, - http_metadata: JSON.stringify(opts.httpMetadata ?? {}), - custom_metadata: JSON.stringify(opts.customMetadata ?? {}), - }); + JSON.stringify(opts.httpMetadata ?? {}), + JSON.stringify(opts.customMetadata ?? {}) + ); return { uploadId }; } - async uploadPart( + async #uploadPart( key: string, uploadId: string, partNumber: number, @@ -900,10 +1004,10 @@ export class R2Gateway { validate.key(key); // Store value in the blob store, computing MD5 digest as we go - const digesting = new DigestingStream(["md5"]); - const blobId = await this.#storage.blob.put(value.pipeThrough(digesting)); + const digesting = new DigestingStream(["MD5"]); + const blobId = await this.blob.put(value.pipeThrough(digesting)); const digests = await digesting.digests; - const md5Digest = digests.get("md5"); + const md5Digest = digests.get("MD5"); assert(md5Digest !== undefined); // Generate random ETag for this part @@ -932,24 +1036,105 @@ export class R2Gateway { return { etag }; } - async completeMultipartUpload( + async #completeMultipartUpload( key: string, uploadId: string, parts: R2PublishedPart[] ): Promise { validate.key(key); + const minPartSize = this.beingTested + ? R2Limits.MIN_MULTIPART_PART_SIZE_TEST + : R2Limits.MIN_MULTIPART_PART_SIZE; const { newRow, oldBlobIds } = this.#stmts.completeMultipartUpload( key, uploadId, - parts + parts, + minPartSize ); for (const blobId of oldBlobIds) this.#backgroundDelete(blobId); return new InternalR2Object(newRow); } - async abortMultipartUpload(key: string, uploadId: string): Promise { + async #abortMultipartUpload(key: string, uploadId: string): Promise { validate.key(key); const oldBlobIds = this.#stmts.abortMultipartUpload(key, uploadId); for (const blobId of oldBlobIds) this.#backgroundDelete(blobId); } + + @GET("/") + get: RouteHandler = async (req) => { + const metadata = decodeHeaderMetadata(req); + + let result: InternalR2Object | InternalR2ObjectBody | InternalR2Objects; + if (metadata.method === "head") { + result = await this.#head(metadata.object); + } else if (metadata.method === "get") { + result = await this.#get(metadata.object, metadata); + } else if (metadata.method === "list") { + result = await this.#list(metadata); + } else { + throw new InternalError(); + } + + return encodeResult(result); + }; + + @PUT("/") + put: RouteHandler = async (req) => { + const { metadata, metadataSize, value } = await decodeMetadata(req); + + if (metadata.method === "delete") { + await this.#delete( + "object" in metadata ? metadata.object : metadata.objects + ); + return new Response(); + } else if (metadata.method === "put") { + // Safety of `!`: `parseInt(null)` is `NaN` + const contentLength = parseInt(req.headers.get("Content-Length")!); + // `workerd` requires a known value size for R2 put requests: + // - https://github.com/cloudflare/workerd/blob/e3479895a2ace28e4fd5f1399cea4c92291966ab/src/workerd/api/r2-rpc.c%2B%2B#L154-L156 + // - https://github.com/cloudflare/workerd/blob/e3479895a2ace28e4fd5f1399cea4c92291966ab/src/workerd/api/r2-rpc.c%2B%2B#L188-L189 + assert(!isNaN(contentLength)); + const valueSize = contentLength - metadataSize; + const result = await this.#put( + metadata.object, + value, + valueSize, + metadata + ); + return encodeResult(result); + } else if (metadata.method === "createMultipartUpload") { + const result = await this.#createMultipartUpload( + metadata.object, + metadata + ); + return encodeJSONResult(result); + } else if (metadata.method === "uploadPart") { + // Safety of `!`: `parseInt(null)` is `NaN` + const contentLength = parseInt(req.headers.get("Content-Length")!); + // `workerd` requires a known value size for R2 put requests as above + assert(!isNaN(contentLength)); + const valueSize = contentLength - metadataSize; + const result = await this.#uploadPart( + metadata.object, + metadata.uploadId, + metadata.partNumber, + value, + valueSize + ); + return encodeJSONResult(result); + } else if (metadata.method === "completeMultipartUpload") { + const result = await this.#completeMultipartUpload( + metadata.object, + metadata.uploadId, + metadata.parts + ); + return encodeResult(result); + } else if (metadata.method === "abortMultipartUpload") { + await this.#abortMultipartUpload(metadata.object, metadata.uploadId); + return new Response(); + } else { + throw new InternalError(); // Unknown method: should never be reached + } + }; } diff --git a/packages/miniflare/src/workers/r2/constants.ts b/packages/miniflare/src/workers/r2/constants.ts new file mode 100644 index 000000000..f5647fbbb --- /dev/null +++ b/packages/miniflare/src/workers/r2/constants.ts @@ -0,0 +1,15 @@ +export const R2Limits = { + MAX_LIST_KEYS: 1_000, + MAX_KEY_SIZE: 1024, + // https://developers.cloudflare.com/r2/platform/limits/ + MAX_VALUE_SIZE: 5_000_000_000 - 5_000_000, // 5GB - 5MB + MAX_METADATA_SIZE: 2048, // 2048B + MIN_MULTIPART_PART_SIZE: 5 * 1024 * 1024, + MIN_MULTIPART_PART_SIZE_TEST: 50, +} as const; + +export const R2Headers = { + ERROR: "cf-r2-error", + REQUEST: "cf-r2-request", + METADATA_SIZE: "cf-r2-metadata-size", +} as const; diff --git a/packages/miniflare/src/plugins/r2/errors.ts b/packages/miniflare/src/workers/r2/errors.worker.ts similarity index 67% rename from packages/miniflare/src/plugins/r2/errors.ts rename to packages/miniflare/src/workers/r2/errors.worker.ts index 69889f55e..064155ae7 100644 --- a/packages/miniflare/src/plugins/r2/errors.ts +++ b/packages/miniflare/src/workers/r2/errors.worker.ts @@ -1,31 +1,24 @@ -import { Response } from "../../http"; -import { HttpError } from "../../shared"; -import { CfHeader } from "../shared/constants"; -import { InternalR2Object } from "./r2Object"; - -enum Status { - BadRequest = 400, - NotFound = 404, - PreconditionFailed = 412, - RangeNotSatisfiable = 416, - InternalError = 500, -} -enum CfCode { - InternalError = 10001, - NoSuchObjectKey = 10007, - EntityTooLarge = 100100, - EntityTooSmall = 10011, - MetadataTooLarge = 10012, - InvalidObjectName = 10020, - InvalidMaxKeys = 10022, - NoSuchUpload = 10024, - InvalidPart = 10025, - InvalidArgument = 10029, - PreconditionFailed = 10031, - BadDigest = 10037, - InvalidRange = 10039, - BadUpload = 10048, -} +import { Buffer } from "node:buffer"; +import { HttpError } from "miniflare:shared"; +import { R2Headers } from "./constants"; +import { InternalR2Object } from "./r2Object.worker"; + +const R2ErrorCode = { + INTERNAL_ERROR: 10001, + NO_SUCH_OBJECT_KEY: 10007, + ENTITY_TOO_LARGE: 100100, + ENTITY_TOO_SMALL: 10011, + METADATA_TOO_LARGE: 10012, + INVALID_OBJECT_NAME: 10020, + INVALID_MAX_KEYS: 10022, + NO_SUCH_UPLOAD: 10024, + INVALID_PART: 10025, + INVALID_ARGUMENT: 10029, + PRECONDITION_FAILED: 10031, + BAD_DIGEST: 10037, + INVALID_RANGE: 10039, + BAD_UPLOAD: 10048, +} as const; export class R2Error extends HttpError { object?: InternalR2Object; @@ -40,9 +33,9 @@ export class R2Error extends HttpError { return new Response(value, { status: this.code, headers: { - [CfHeader.MetadataSize]: `${metadataSize}`, + [R2Headers.METADATA_SIZE]: `${metadataSize}`, "Content-Type": "application/json", - [CfHeader.Error]: JSON.stringify({ + [R2Headers.ERROR]: JSON.stringify({ message: this.message, version: 1, // Note the lowercase 'c', which the runtime expects @@ -54,7 +47,7 @@ export class R2Error extends HttpError { return new Response(null, { status: this.code, headers: { - [CfHeader.Error]: JSON.stringify({ + [R2Headers.ERROR]: JSON.stringify({ message: this.message, version: 1, // Note the lowercase 'c', which the runtime expects @@ -77,29 +70,25 @@ export class R2Error extends HttpError { export class InvalidMetadata extends R2Error { constructor() { - super( - Status.BadRequest, - "Metadata missing or invalid", - CfCode.InvalidArgument - ); + super(400, "Metadata missing or invalid", R2ErrorCode.INVALID_ARGUMENT); } } export class InternalError extends R2Error { constructor() { super( - Status.InternalError, + 500, "We encountered an internal error. Please try again.", - CfCode.InternalError + R2ErrorCode.INTERNAL_ERROR ); } } export class NoSuchKey extends R2Error { constructor() { super( - Status.NotFound, + 404, "The specified key does not exist.", - CfCode.NoSuchObjectKey + R2ErrorCode.NO_SUCH_OBJECT_KEY ); } } @@ -107,9 +96,9 @@ export class NoSuchKey extends R2Error { export class EntityTooLarge extends R2Error { constructor() { super( - Status.BadRequest, + 400, "Your proposed upload exceeds the maximum allowed object size.", - CfCode.EntityTooLarge + R2ErrorCode.ENTITY_TOO_LARGE ); } } @@ -117,9 +106,9 @@ export class EntityTooLarge extends R2Error { export class EntityTooSmall extends R2Error { constructor() { super( - Status.BadRequest, + 400, "Your proposed upload is smaller than the minimum allowed object size.", - CfCode.EntityTooSmall + R2ErrorCode.ENTITY_TOO_SMALL ); } } @@ -127,9 +116,9 @@ export class EntityTooSmall extends R2Error { export class MetadataTooLarge extends R2Error { constructor() { super( - Status.BadRequest, + 400, "Your metadata headers exceed the maximum allowed metadata size.", - CfCode.MetadataTooLarge + R2ErrorCode.METADATA_TOO_LARGE ); } } @@ -141,7 +130,7 @@ export class BadDigest extends R2Error { calculated: Buffer ) { super( - Status.BadRequest, + 400, [ `The ${algorithm} checksum you specified did not match what we received.`, `You provided a ${algorithm} checksum with value: ${provided.toString( @@ -149,7 +138,7 @@ export class BadDigest extends R2Error { )}`, `Actual ${algorithm} was: ${calculated.toString("hex")}`, ].join("\n"), - CfCode.BadDigest + R2ErrorCode.BAD_DIGEST ); } } @@ -157,9 +146,9 @@ export class BadDigest extends R2Error { export class InvalidObjectName extends R2Error { constructor() { super( - Status.BadRequest, + 400, "The specified object name is not valid.", - CfCode.InvalidObjectName + R2ErrorCode.INVALID_OBJECT_NAME ); } } @@ -167,9 +156,9 @@ export class InvalidObjectName extends R2Error { export class InvalidMaxKeys extends R2Error { constructor() { super( - Status.BadRequest, + 400, "MaxKeys params must be positive integer <= 1000.", - CfCode.InvalidMaxKeys + R2ErrorCode.INVALID_MAX_KEYS ); } } @@ -177,9 +166,9 @@ export class InvalidMaxKeys extends R2Error { export class NoSuchUpload extends R2Error { constructor() { super( - Status.BadRequest, + 400, "The specified multipart upload does not exist.", - CfCode.NoSuchUpload + R2ErrorCode.NO_SUCH_UPLOAD ); } } @@ -187,9 +176,9 @@ export class NoSuchUpload extends R2Error { export class InvalidPart extends R2Error { constructor() { super( - Status.BadRequest, + 400, "One or more of the specified parts could not be found.", - CfCode.InvalidPart + R2ErrorCode.INVALID_PART ); } } @@ -197,9 +186,9 @@ export class InvalidPart extends R2Error { export class PreconditionFailed extends R2Error { constructor() { super( - Status.PreconditionFailed, + 412, "At least one of the pre-conditions you specified did not hold.", - CfCode.PreconditionFailed + R2ErrorCode.PRECONDITION_FAILED ); } } @@ -207,9 +196,9 @@ export class PreconditionFailed extends R2Error { export class InvalidRange extends R2Error { constructor() { super( - Status.RangeNotSatisfiable, + 416, "The requested range is not satisfiable", - CfCode.InvalidRange + R2ErrorCode.INVALID_RANGE ); } } @@ -217,9 +206,9 @@ export class InvalidRange extends R2Error { export class BadUpload extends R2Error { constructor() { super( - Status.RangeNotSatisfiable, + 500, "There was a problem with the multipart upload.", - CfCode.BadUpload + R2ErrorCode.BAD_UPLOAD ); } } diff --git a/packages/miniflare/src/plugins/r2/r2Object.ts b/packages/miniflare/src/workers/r2/r2Object.worker.ts similarity index 90% rename from packages/miniflare/src/plugins/r2/r2Object.ts rename to packages/miniflare/src/workers/r2/r2Object.worker.ts index fbc59c681..90f1492c9 100644 --- a/packages/miniflare/src/plugins/r2/r2Object.ts +++ b/packages/miniflare/src/workers/r2/r2Object.worker.ts @@ -1,8 +1,10 @@ -import { Blob } from "buffer"; -import { ReadableStream, TransformStream } from "stream/web"; -import type { R2StringChecksums } from "@cloudflare/workers-types/experimental"; -import { HEX_REGEXP } from "../../shared"; -import { ObjectRow, R2HeadResponse, R2HttpFields, R2Range } from "./schemas"; +import { HEX_REGEXP } from "miniflare:zod"; +import { + ObjectRow, + R2HeadResponse, + R2HttpFields, + R2Range, +} from "./schemas.worker"; export interface EncodedMetadata { metadataSize: number; @@ -90,7 +92,7 @@ export class InternalR2ObjectBody extends InternalR2Object { encode(): EncodedMetadata { const { metadataSize, value: metadata } = super.encode(); - const identity = new TransformStream(); + const identity = new IdentityTransformStream(); void metadata .pipeTo(identity.writable, { preventClose: true }) .then(() => this.body.pipeTo(identity.writable)); diff --git a/packages/miniflare/src/plugins/r2/schemas.ts b/packages/miniflare/src/workers/r2/schemas.worker.ts similarity index 98% rename from packages/miniflare/src/plugins/r2/schemas.ts rename to packages/miniflare/src/workers/r2/schemas.worker.ts index 3166945a4..8b7c62bbf 100644 --- a/packages/miniflare/src/plugins/r2/schemas.ts +++ b/packages/miniflare/src/workers/r2/schemas.worker.ts @@ -1,5 +1,5 @@ -import { z } from "zod"; -import { Base64DataSchema, HexDataSchema, ValueOf } from "../../shared"; +import { ValueOf } from "miniflare:shared"; +import { Base64DataSchema, HexDataSchema, z } from "miniflare:zod"; export interface ObjectRow { key: string; diff --git a/packages/miniflare/src/plugins/r2/validator.ts b/packages/miniflare/src/workers/r2/validator.worker.ts similarity index 86% rename from packages/miniflare/src/plugins/r2/validator.ts rename to packages/miniflare/src/workers/r2/validator.worker.ts index 31578e91c..0bbe2aed0 100644 --- a/packages/miniflare/src/plugins/r2/validator.ts +++ b/packages/miniflare/src/workers/r2/validator.worker.ts @@ -1,7 +1,7 @@ -import assert from "assert"; -import type { R2StringChecksums } from "@cloudflare/workers-types/experimental"; -import { InclusiveRange } from "../../storage"; -import { _parseRanges } from "../shared"; +import assert from "node:assert"; +import { Buffer } from "node:buffer"; +import { InclusiveRange, parseRanges } from "miniflare:shared"; +import { R2Limits } from "./constants"; import { BadDigest, EntityTooLarge, @@ -10,15 +10,9 @@ import { InvalidRange, MetadataTooLarge, PreconditionFailed, -} from "./errors"; -import { InternalR2Object } from "./r2Object"; -import { InternalR2GetOptions, R2Conditional, R2Etag } from "./schemas"; - -export const MAX_LIST_KEYS = 1_000; -const MAX_KEY_SIZE = 1024; -// https://developers.cloudflare.com/r2/platform/limits/ -const MAX_VALUE_SIZE = 5_000_000_000 - 5_000_000; // 5GB - 5MB -const MAX_METADATA_SIZE = 2048; // 2048B +} from "./errors.worker"; +import { InternalR2Object } from "./r2Object.worker"; +import { InternalR2GetOptions, R2Conditional, R2Etag } from "./schemas.worker"; function identity(ms: number) { return ms; @@ -90,6 +84,7 @@ export type R2Hashes = Record< typeof R2_HASH_ALGORITHMS[number]["field"], Buffer | undefined >; +export type DigestAlgorithm = typeof R2_HASH_ALGORITHMS[number]["name"]; function serialisedLength(x: string) { // Adapted from internal R2 gateway implementation @@ -100,12 +95,15 @@ function serialisedLength(x: string) { } export class Validator { - hash(digests: Map, hashes: R2Hashes): R2StringChecksums { + hash( + digests: Map, + hashes: R2Hashes + ): R2StringChecksums { const checksums: R2StringChecksums = {}; for (const { name, field } of R2_HASH_ALGORITHMS) { const providedHash = hashes[field]; if (providedHash !== undefined) { - const computedHash = digests.get(field); + const computedHash = digests.get(name); // Should've computed all required digests assert(computedHash !== undefined); if (!providedHash.equals(computedHash)) { @@ -134,7 +132,7 @@ export class Validator { size: number ): InclusiveRange | undefined { if (options.rangeHeader !== undefined) { - const ranges = _parseRanges(options.rangeHeader, size); + const ranges = parseRanges(options.rangeHeader, size); // If the header contained a single range, use it. Otherwise, if the // header was invalid, or contained multiple ranges, just return the full // response (by returning undefined from this function). @@ -160,7 +158,7 @@ export class Validator { } size(size: number): Validator { - if (size > MAX_VALUE_SIZE) { + if (size > R2Limits.MAX_VALUE_SIZE) { throw new EntityTooLarge(); } return this; @@ -172,7 +170,7 @@ export class Validator { for (const [key, value] of Object.entries(customMetadata)) { metadataLength += serialisedLength(key) + serialisedLength(value); } - if (metadataLength > MAX_METADATA_SIZE) { + if (metadataLength > R2Limits.MAX_METADATA_SIZE) { throw new MetadataTooLarge(); } return this; @@ -180,14 +178,14 @@ export class Validator { key(key: string): Validator { const keyLength = Buffer.byteLength(key); - if (keyLength >= MAX_KEY_SIZE) { + if (keyLength >= R2Limits.MAX_KEY_SIZE) { throw new InvalidObjectName(); } return this; } limit(limit?: number): Validator { - if (limit !== undefined && (limit < 1 || limit > MAX_LIST_KEYS)) { + if (limit !== undefined && (limit < 1 || limit > R2Limits.MAX_LIST_KEYS)) { throw new InvalidMaxKeys(); } return this; diff --git a/packages/miniflare/test/fixtures/r2/validator.ts b/packages/miniflare/test/fixtures/r2/validator.ts new file mode 100644 index 000000000..19e0fd98b --- /dev/null +++ b/packages/miniflare/test/fixtures/r2/validator.ts @@ -0,0 +1,175 @@ +import assert from "node:assert"; +import type { InternalR2Object } from "../../../src/workers/r2/r2Object.worker"; +import type { R2Conditional } from "../../../src/workers/r2/schemas.worker"; +import { _testR2Conditional } from "../../../src/workers/r2/validator.worker"; +import { createTestHandler } from "../worker-test"; + +function test() { + // Adapted from internal R2 gateway tests + const etag = "test"; + const badEtag = "not-test"; + + const uploadedDate = new Date("2023-02-24T00:09:00.500Z"); + const pastDate = new Date(uploadedDate.getTime() - 30_000); + const futureDate = new Date(uploadedDate.getTime() + 30_000); + + const metadata: Pick = { + etag, + uploaded: uploadedDate.getTime(), + }; + + const using = (cond: R2Conditional) => _testR2Conditional(cond, metadata); + const usingMissing = (cond: R2Conditional) => _testR2Conditional(cond); + + // Check single conditions + assert(using({ etagMatches: [{ type: "strong", value: etag }] })); + assert(!using({ etagMatches: [{ type: "strong", value: badEtag }] })); + + assert(using({ etagDoesNotMatch: [{ type: "strong", value: badEtag }] })); + assert(!using({ etagDoesNotMatch: [{ type: "strong", value: etag }] })); + + assert(!using({ uploadedBefore: pastDate })); + assert(using({ uploadedBefore: futureDate })); + + assert(using({ uploadedAfter: pastDate })); + assert(!using({ uploadedBefore: pastDate })); + + // Check with weaker etags + assert(!using({ etagMatches: [{ type: "weak", value: etag }] })); + assert(!using({ etagDoesNotMatch: [{ type: "weak", value: etag }] })); + assert(using({ etagDoesNotMatch: [{ type: "weak", value: badEtag }] })); + assert(using({ etagMatches: [{ type: "wildcard" }] })); + assert(!using({ etagDoesNotMatch: [{ type: "wildcard" }] })); + + // Check multiple conditions that evaluate to false + assert( + !using({ + etagMatches: [{ type: "strong", value: etag }], + etagDoesNotMatch: [{ type: "strong", value: etag }], + }) + ); + assert( + !using({ + etagMatches: [{ type: "strong", value: etag }], + uploadedAfter: futureDate, + }) + ); + assert( + !using({ + // `etagMatches` pass makes `uploadedBefore` pass, but `uploadedAfter` fails + etagMatches: [{ type: "strong", value: etag }], + uploadedAfter: futureDate, + uploadedBefore: pastDate, + }) + ); + assert( + !using({ + etagDoesNotMatch: [{ type: "strong", value: badEtag }], + uploadedBefore: pastDate, + }) + ); + assert( + !using({ + // `etagDoesNotMatch` pass makes `uploadedAfter` pass, but `uploadedBefore` fails + etagDoesNotMatch: [{ type: "strong", value: badEtag }], + uploadedAfter: futureDate, + uploadedBefore: pastDate, + }) + ); + assert( + !using({ + etagMatches: [{ type: "strong", value: badEtag }], + etagDoesNotMatch: [{ type: "strong", value: badEtag }], + uploadedAfter: pastDate, + uploadedBefore: futureDate, + }) + ); + + // Check multiple conditions that evaluate to true + assert( + using({ + etagMatches: [{ type: "strong", value: etag }], + etagDoesNotMatch: [{ type: "strong", value: badEtag }], + }) + ); + // `etagMatches` pass makes `uploadedBefore` pass + assert( + using({ + etagMatches: [{ type: "strong", value: etag }], + uploadedBefore: pastDate, + }) + ); + // `etagDoesNotMatch` pass makes `uploadedAfter` pass + assert( + using({ + etagDoesNotMatch: [{ type: "strong", value: badEtag }], + uploadedAfter: futureDate, + }) + ); + assert( + using({ + // `etagMatches` pass makes `uploadedBefore` pass + etagMatches: [{ type: "strong", value: etag }], + uploadedBefore: pastDate, + // `etagDoesNotMatch` pass makes `uploadedAfter` pass + etagDoesNotMatch: [{ type: "strong", value: badEtag }], + uploadedAfter: futureDate, + }) + ); + assert( + using({ + uploadedBefore: futureDate, + // `etagDoesNotMatch` pass makes `uploadedAfter` pass + etagDoesNotMatch: [{ type: "strong", value: badEtag }], + uploadedAfter: futureDate, + }) + ); + assert( + using({ + uploadedAfter: pastDate, + // `etagMatches` pass makes `uploadedBefore` pass + etagMatches: [{ type: "strong", value: etag }], + uploadedBefore: pastDate, + }) + ); + + // Check missing metadata fails with either `etagMatches` and `uploadedAfter` + assert(!usingMissing({ etagMatches: [{ type: "strong", value: etag }] })); + assert(!usingMissing({ uploadedAfter: pastDate })); + assert( + !usingMissing({ + etagMatches: [{ type: "strong", value: etag }], + uploadedAfter: pastDate, + }) + ); + assert(usingMissing({ etagDoesNotMatch: [{ type: "strong", value: etag }] })); + assert(usingMissing({ uploadedBefore: pastDate })); + assert( + usingMissing({ + etagDoesNotMatch: [{ type: "strong", value: etag }], + uploadedBefore: pastDate, + }) + ); + assert( + !usingMissing({ + etagMatches: [{ type: "strong", value: etag }], + uploadedBefore: pastDate, + }) + ); + assert( + !usingMissing({ + etagDoesNotMatch: [{ type: "strong", value: etag }], + uploadedAfter: pastDate, + }) + ); + + // Check with second granularity + const justPastDate = new Date(uploadedDate.getTime() - 250); + const justFutureDate = new Date(uploadedDate.getTime() + 250); + assert(using({ uploadedAfter: justPastDate })); + assert(!using({ uploadedAfter: justPastDate, secondsGranularity: true })); + assert(using({ uploadedBefore: justFutureDate })); + assert(!using({ uploadedBefore: justFutureDate, secondsGranularity: true })); +} + +export default createTestHandler(test); diff --git a/packages/miniflare/test/plugins/r2/index.spec.ts b/packages/miniflare/test/plugins/r2/index.spec.ts index 812aef9e4..9fb049135 100644 --- a/packages/miniflare/test/plugins/r2/index.spec.ts +++ b/packages/miniflare/test/plugins/r2/index.spec.ts @@ -2,44 +2,54 @@ import assert from "assert"; import crypto from "crypto"; -import path from "path"; +import fs from "fs/promises"; import { text } from "stream/consumers"; import type { R2Bucket, R2Conditional, R2ListOptions, R2Object, + R2ObjectBody, R2Objects, } from "@cloudflare/workers-types/experimental"; import { Macro, ThrowsExpectation } from "ava"; import { Headers, + Miniflare, MiniflareOptions, - MultipartPartRow, - ObjectRow, - R2Gateway, + R2_PLUGIN_NAME, ReplaceWorkersTypes, - Storage, - TypedDatabase, - createFileStorage, } from "miniflare"; +import type { + MultipartPartRow, + ObjectRow, +} from "../../../src/workers/r2/schemas.worker"; import { + MiniflareDurableObjectControlStub, MiniflareTestContext, + Namespaced, isWithin, miniflareTest, + namespace, useTmp, } from "../../test-shared"; const WITHIN_EPSILON = 10_000; -function sqlStmts(db: TypedDatabase) { +function sqlStmts(object: MiniflareDurableObjectControlStub) { return { - getObjectByKey: db.prepare( - "SELECT * FROM _mf_objects WHERE key = ?" - ), - getPartsByUploadId: db.prepare( - "SELECT * FROM _mf_multipart_parts WHERE upload_id = ? ORDER BY part_number" - ), + getObjectByKey: async (key: string): Promise => + ( + await object.sqlQuery( + "SELECT * FROM _mf_objects WHERE key = ?", + key + ) + )[0], + getPartsByUploadId: (uploadId: string) => + object.sqlQuery( + "SELECT * FROM _mf_multipart_parts WHERE upload_id = ? ORDER BY part_number", + uploadId + ), }; } @@ -47,41 +57,10 @@ function hash(value: string, algorithm = "md5") { return crypto.createHash(algorithm).update(value).digest("hex"); } -type NamespacedR2Bucket = ReplaceWorkersTypes & { ns: string }; - -// Automatically prefix all keys with the specified namespace -function nsBucket( - ns: string, - bucket: ReplaceWorkersTypes -): NamespacedR2Bucket { - return new Proxy(bucket as NamespacedR2Bucket, { - get(target, key, receiver) { - if (key === "ns") return ns; - const value = Reflect.get(target, key, receiver); - if (typeof value === "function" && key !== "list") { - return (keys: string | string[], ...args: unknown[]) => { - if (typeof keys === "string") keys = ns + keys; - if (Array.isArray(keys)) keys = keys.map((key) => ns + key); - return value(keys, ...args); - }; - } - return value; - }, - set(target, key, newValue, receiver) { - if (key === "ns") { - ns = newValue; - return true; - } else { - return Reflect.set(target, key, newValue, receiver); - } - }, - }); -} - interface Context extends MiniflareTestContext { ns: string; - r2: NamespacedR2Bucket; - storage: Storage; + r2: Namespaced>; + object: MiniflareDurableObjectControlStub; } const opts: Partial = { @@ -102,15 +81,25 @@ test.beforeEach(async (t) => { Math.random() * Number.MAX_SAFE_INTEGER )}`; t.context.ns = ns; - t.context.r2 = nsBucket(ns, await t.context.mf.getR2Bucket("BUCKET")); - t.context.storage = t.context.mf._getPluginStorage("r2", "bucket"); + t.context.r2 = namespace(ns, await t.context.mf.getR2Bucket("BUCKET")); + + // Enable fake timers + const objectNamespace = await t.context.mf._getInternalDurableObjectNamespace( + R2_PLUGIN_NAME, + "r2:bucket", + "R2BucketObject" + ); + const objectId = objectNamespace.idFromName("bucket"); + const objectStub = objectNamespace.get(objectId); + t.context.object = new MiniflareDurableObjectControlStub(objectStub); + await t.context.object.enableFakeTimers(1_000_000); }); const validatesKeyMacro: Macro< [ { method: string; - f: (r2: NamespacedR2Bucket, key?: any) => Promise; + f: (r2: ReplaceWorkersTypes, key?: any) => Promise; } ], Context @@ -169,7 +158,6 @@ test("head: returns metadata for existing keys", async (t) => { // Test proxying of `writeHttpMetadata()` const headers = new Headers({ "X-Key": "value" }); - // noinspection JSVoidFunctionReturnValueUsed t.is(object.writeHttpMetadata(headers), undefined); t.is(headers.get("Content-Type"), "text/plain"); t.is(headers.get("X-Key"), "value"); @@ -218,7 +206,6 @@ test("get: returns metadata and body for existing keys", async (t) => { // Test proxying of `writeHttpMetadata()` const headers = new Headers({ "X-Key": "value" }); - // noinspection JSVoidFunctionReturnValueUsed t.is(body.writeHttpMetadata(headers), undefined); t.is(headers.get("Content-Type"), "text/plain"); t.is(headers.get("X-Key"), "value"); @@ -388,10 +375,10 @@ test("put: returns metadata for created object", async (t) => { isWithin(t, WITHIN_EPSILON, object.uploaded.getTime(), start); }); test("put: overrides existing keys", async (t) => { - const { r2, ns, storage, timers } = t.context; + const { r2, ns, object } = t.context; await r2.put("key", "value1"); - const stmts = sqlStmts(storage.db); - const objectRow = stmts.getObjectByKey.get(`${ns}key`); + const stmts = sqlStmts(object); + const objectRow = await stmts.getObjectByKey(`${ns}key`); assert(objectRow?.blob_id != null); await r2.put("key", "value2"); @@ -400,10 +387,13 @@ test("put: overrides existing keys", async (t) => { t.is(await body.text(), "value2"); // Check deletes old blob - await timers.waitForTasks(); - t.is(await storage.blob.get(objectRow.blob_id), null); + await object.waitForFakeTasks(); + t.is(await object.getBlob(objectRow.blob_id), null); +}); +test(validatesKeyMacro, { + method: "put", + f: (r2, key) => r2.put(key, "v"), }); -test(validatesKeyMacro, { method: "put", f: (r2, key) => r2.put(key, "v") }); test("put: validates checksums", async (t) => { const { r2 } = t.context; const expectations = ( @@ -494,7 +484,7 @@ test("put: stores only if passes onlyIf", async (t) => { }; const fail = async (cond: R2Conditional) => { const object = await r2.put("key", "2", { onlyIf: cond }); - t.is(object as ReplaceWorkersTypes | null, null); + t.is(object as R2Object | null, null); t.is(await (await r2.get("key"))?.text(), "1"); // No `reset()` as we've just checked we didn't update anything }; @@ -551,22 +541,22 @@ test("put: validates metadata size", async (t) => { }); test("delete: deletes existing keys", async (t) => { - const { r2, ns, storage, timers } = t.context; + const { r2, ns, object } = t.context; // Check does nothing with non-existent key await r2.delete("key"); // Check deletes single key await r2.put("key", "value"); - const stmts = sqlStmts(storage.db); - const objectRow = stmts.getObjectByKey.get(`${ns}key`); + const stmts = sqlStmts(object); + const objectRow = await stmts.getObjectByKey(`${ns}key`); assert(objectRow?.blob_id != null); t.not(await r2.head("key"), null); await r2.delete("key"); t.is(await r2.head("key"), null); // Check deletes old blob - await timers.waitForTasks(); - t.is(await storage.blob.get(objectRow.blob_id), null); + await object.waitForFakeTasks(); + t.is(await object.getBlob(objectRow.blob_id), null); // Check deletes multiple keys, skipping non-existent keys await r2.put("key1", "value1"); @@ -577,7 +567,10 @@ test("delete: deletes existing keys", async (t) => { t.not(await r2.head("key2"), null); t.is(await r2.head("key3"), null); }); -test(validatesKeyMacro, { method: "delete", f: (r2, key) => r2.delete(key) }); +test(validatesKeyMacro, { + method: "delete", + f: (r2, key) => r2.delete(key), +}); test("delete: validates keys", validatesKeyMacro, { method: "delete", f: (r2, key) => r2.delete(["valid key", key]), @@ -845,9 +838,9 @@ test("list: returns correct delimitedPrefixes for delimiter and prefix", async ( const allKeys = Object.keys(values); for (const [key, value] of Object.entries(values)) await r2.put(key, value); - const keys = (result: ReplaceWorkersTypes) => + const keys = (result: R2Objects) => result.objects.map(({ key }) => key.substring(ns.length)); - const delimitedPrefixes = (result: ReplaceWorkersTypes) => + const delimitedPrefixes = (result: R2Objects) => result.delimitedPrefixes.map((prefix) => prefix.substring(ns.length)); const allKeysWithout = (...exclude: string[]) => allKeys.filter((value) => !exclude.includes(value)); @@ -922,30 +915,36 @@ test.serial("operations permit empty key", async (t) => { t.is(await r2.head(""), null); }); -test.serial("operations persist stored data", async (t) => { - const { mf, ns } = t.context; - - // Create new temporary file-system persistence directory +test("operations persist stored data", async (t) => { const tmp = await useTmp(t); - const storage = createFileStorage(path.join(tmp, "bucket")); - - // Set option, then reset after test - await t.context.setOptions({ ...opts, r2Persist: tmp }); - t.teardown(() => t.context.setOptions(opts)); - const r2 = nsBucket(ns, await mf.getR2Bucket("BUCKET")); + const persistOpts: MiniflareOptions = { + verbose: true, + modules: true, + script: "", + r2Buckets: { BUCKET: "bucket" }, + r2Persist: tmp, + }; + let mf = new Miniflare(persistOpts); + t.teardown(() => mf.dispose()); + let r2 = await mf.getR2Bucket("BUCKET"); // Check put respects persist await r2.put("key", "value"); - const stmtListByNs = storage.db.prepare<{ ns: string }, { key: string }>( - "SELECT key FROM _mf_objects WHERE key LIKE :ns || '%'" - ); - let stored = stmtListByNs.all({ ns }); - t.deepEqual(stored, [{ key: `${ns}key` }]); // Check head respects persist let object = await r2.head("key"); t.is(object?.size, 5); + // Check directory created for namespace + const names = await fs.readdir(tmp); + t.true(names.includes("miniflare-R2BucketObject")); + + // Check "restarting" keeps persisted data + await mf.dispose(); + mf = new Miniflare(persistOpts); + await mf.ready; + r2 = await mf.getR2Bucket("BUCKET"); + // Check get respects persist const objectBody = await r2.get("key"); t.is(await objectBody?.text(), "value"); @@ -957,16 +956,16 @@ test.serial("operations persist stored data", async (t) => { // Check delete respects persist await r2.delete("key"); - stored = stmtListByNs.all({ ns }); - t.deepEqual(stored, []); + object = await r2.head("key"); + t.is(object, null); // Check multipart operations respect persist const upload = await r2.createMultipartUpload("multipart"); const part = await upload.uploadPart(1, "multipart"); object = await upload.complete([part]); t.is(object?.size, 9); - stored = stmtListByNs.all({ ns }); - t.deepEqual(stored, [{ key: `${ns}multipart` }]); + object = await r2.head("multipart"); + t.not(object, null); }); test.serial("operations permit strange bucket names", async (t) => { @@ -976,23 +975,16 @@ test.serial("operations permit strange bucket names", async (t) => { const id = "my/ Bucket"; await t.context.setOptions({ ...opts, r2Buckets: { BUCKET: id } }); t.teardown(() => t.context.setOptions(opts)); - const r2 = nsBucket(ns, await mf.getR2Bucket("BUCKET")); + const r2 = namespace(ns, await mf.getR2Bucket("BUCKET")); // Check basic operations work await r2.put("key", "value"); const object = await r2.get("key"); t.is(await object?.text(), "value"); - - // Check stored with correct ID - const storage = t.context.mf._getPluginStorage("r2", id); - const stmts = sqlStmts(storage.db); - t.not(stmts.getObjectByKey.get(`${ns}key`), undefined); }); // Multipart tests const PART_SIZE = 50; -// Reduce the minimum multipart part size, so we can use small values for tests -test.before(() => (R2Gateway._MIN_MULTIPART_PART_SIZE = PART_SIZE)); function objectNameNotValidExpectations(method: string) { return >{ @@ -1036,7 +1028,7 @@ test("createMultipartUpload", async (t) => { ); }); test("uploadPart", async (t) => { - const { storage, r2 } = t.context; + const { r2, object } = t.context; // Check uploads parts const upload = await r2.createMultipartUpload("key"); @@ -1047,17 +1039,17 @@ test("uploadPart", async (t) => { t.is(part2.partNumber, 2); t.not(part2.etag, ""); t.not(part2.etag, part1.etag); - const stmts = sqlStmts(storage.db); - const partRows = stmts.getPartsByUploadId.all(upload.uploadId); + const stmts = sqlStmts(object); + const partRows = await stmts.getPartsByUploadId(upload.uploadId); t.is(partRows.length, 2); t.is(partRows[0].part_number, 1); t.is(partRows[0].size, 6); t.is(partRows[1].part_number, 2); t.is(partRows[1].size, 9); - const value1 = await storage.blob.get(partRows[0].blob_id); + const value1 = await object.getBlob(partRows[0].blob_id); assert(value1 !== null); t.is(await text(value1), "value1"); - const value2 = await storage.blob.get(partRows[1].blob_id); + const value2 = await object.getBlob(partRows[1].blob_id); assert(value2 !== null); t.is(await text(value2), "value two"); @@ -1082,7 +1074,7 @@ test("uploadPart", async (t) => { await t.throwsAsync(nonExistentUpload.uploadPart(1, "value"), expectations); }); test("abortMultipartUpload", async (t) => { - const { storage, r2, timers } = t.context; + const { r2, object } = t.context; // Check deletes upload and all parts for corresponding upload const upload1 = await r2.createMultipartUpload("key"); @@ -1090,14 +1082,14 @@ test("abortMultipartUpload", async (t) => { await upload1.uploadPart(1, "value1"); await upload1.uploadPart(2, "value2"); await upload1.uploadPart(3, "value3"); - const stmts = sqlStmts(storage.db); - const parts = stmts.getPartsByUploadId.all(upload1.uploadId); + const stmts = sqlStmts(object); + const parts = await stmts.getPartsByUploadId(upload1.uploadId); t.is(parts.length, 3); await upload1.abort(); - t.is(stmts.getPartsByUploadId.all(upload1.uploadId).length, 0); + t.is((await stmts.getPartsByUploadId(upload1.uploadId)).length, 0); // Check blobs deleted - await timers.waitForTasks(); - for (const part of parts) t.is(await storage.blob.get(part.blob_id), null); + await object.waitForFakeTasks(); + for (const part of parts) t.is(await object.getBlob(part.blob_id), null); // Check cannot upload after abort let expectations = doesNotExistExpectations("uploadPart"); @@ -1125,7 +1117,7 @@ test("abortMultipartUpload", async (t) => { await t.throwsAsync(nonExistentUpload.abort(), expectations); }); test("completeMultipartUpload", async (t) => { - const { storage, r2, ns, timers } = t.context; + const { r2, ns, object: objectStub } = t.context; // Check creates regular key with correct metadata, and returns object const upload1 = await r2.createMultipartUpload("key", { @@ -1151,8 +1143,9 @@ test("completeMultipartUpload", async (t) => { await objectBody?.text(), `${"1".repeat(PART_SIZE)}${"2".repeat(PART_SIZE)}3` ); - const stmts = sqlStmts(storage.db); - const parts = stmts.getPartsByUploadId.all(upload1.uploadId); + + const stmts = sqlStmts(objectStub); + const parts = await stmts.getPartsByUploadId(upload1.uploadId); t.is(parts.length, 3); // Check requires all but last part to be greater than 5MB @@ -1173,12 +1166,12 @@ test("completeMultipartUpload", async (t) => { t.is(object.size, 1); t.is(object.etag, "46d1741e8075da4ac72c71d8130fcb71-1"); // Check previous multipart uploads blobs deleted - await timers.waitForTasks(); - for (const part of parts) t.is(await storage.blob.get(part.blob_id), null); + await objectStub.waitForFakeTasks(); + for (const part of parts) t.is(await objectStub.getBlob(part.blob_id), null); // Check completing multiple uploads overrides existing, deleting all parts - t.is(stmts.getPartsByUploadId.all(upload1.uploadId).length, 0); - t.is(stmts.getPartsByUploadId.all(upload2.uploadId).length, 1); + t.is((await stmts.getPartsByUploadId(upload1.uploadId)).length, 0); + t.is((await stmts.getPartsByUploadId(upload2.uploadId)).length, 1); objectBody = await r2.get("key"); t.is(await objectBody?.text(), "1"); @@ -1395,18 +1388,18 @@ test("get: is multipart aware", async (t) => { // Check ranged get accessing single part const halfPartSize = Math.floor(PART_SIZE / 2); const quarterPartSize = Math.floor(PART_SIZE / 4); - object = await r2.get("key", { + object = (await r2.get("key", { range: { offset: halfPartSize, length: quarterPartSize }, - }); + })) as ReplaceWorkersTypes | null; t.is(await object?.text(), "a".repeat(quarterPartSize)); // Check ranged get accessing multiple parts - object = await r2.get("key", { + object = (await r2.get("key", { range: { offset: halfPartSize, length: halfPartSize + PART_SIZE + quarterPartSize, }, - }); + })) as ReplaceWorkersTypes | null; t.is( await object?.text(), `${"a".repeat(halfPartSize)}${"b".repeat(PART_SIZE)}${"c".repeat( @@ -1415,16 +1408,16 @@ test("get: is multipart aware", async (t) => { ); // Check ranged get of suffix - object = await r2.get("key", { + object = (await r2.get("key", { range: { suffix: quarterPartSize + PART_SIZE }, - }); + })) as ReplaceWorkersTypes | null; t.is( await object?.text(), `${"b".repeat(quarterPartSize)}${"c".repeat(PART_SIZE)}` ); }); test("put: is multipart aware", async (t) => { - const { storage, r2, timers } = t.context; + const { r2, object: objectStub } = t.context; // Check doesn't overwrite parts for in-progress multipart upload const upload = await r2.createMultipartUpload("key"); @@ -1433,23 +1426,23 @@ test("put: is multipart aware", async (t) => { const part3 = await upload.uploadPart(3, "3".repeat(PART_SIZE)); await r2.put("key", "value"); - const stmts = sqlStmts(storage.db); - t.is(stmts.getPartsByUploadId.all(upload.uploadId).length, 3); + const stmts = sqlStmts(objectStub); + t.is((await stmts.getPartsByUploadId(upload.uploadId)).length, 3); const object = await upload.complete([part1, part2, part3]); t.is(object.size, 3 * PART_SIZE); - const parts = stmts.getPartsByUploadId.all(upload.uploadId); + const parts = await stmts.getPartsByUploadId(upload.uploadId); t.is(parts.length, 3); // Check overwrites all multipart parts of completed upload await r2.put("key", "new-value"); - t.is(stmts.getPartsByUploadId.all(upload.uploadId).length, 0); + t.is((await stmts.getPartsByUploadId(upload.uploadId)).length, 0); // Check deletes all previous blobs - await timers.waitForTasks(); - for (const part of parts) t.is(await storage.blob.get(part.blob_id), null); + await objectStub.waitForFakeTasks(); + for (const part of parts) t.is(await objectStub.getBlob(part.blob_id), null); }); test("delete: is multipart aware", async (t) => { - const { storage, r2, timers } = t.context; + const { r2, object: objectStub } = t.context; // Check doesn't remove parts for in-progress multipart upload const upload = await r2.createMultipartUpload("key"); @@ -1461,17 +1454,17 @@ test("delete: is multipart aware", async (t) => { // Check removes all multipart parts of completed upload const object = await upload.complete([part1, part2, part3]); t.is(object.size, 3 * PART_SIZE); - const stmts = sqlStmts(storage.db); - const parts = stmts.getPartsByUploadId.all(upload.uploadId); + const stmts = sqlStmts(objectStub); + const parts = await stmts.getPartsByUploadId(upload.uploadId); t.is(parts.length, 3); await r2.delete("key"); - t.is(stmts.getPartsByUploadId.all(upload.uploadId).length, 0); + t.is((await stmts.getPartsByUploadId(upload.uploadId)).length, 0); // Check deletes all previous blobs - await timers.waitForTasks(); - for (const part of parts) t.is(await storage.blob.get(part.blob_id), null); + await objectStub.waitForFakeTasks(); + for (const part of parts) t.is(await objectStub.getBlob(part.blob_id), null); }); test("delete: waits for in-progress multipart gets before deleting part blobs", async (t) => { - const { storage, r2, timers } = t.context; + const { r2, object: objectStub } = t.context; const upload = await r2.createMultipartUpload("key"); const part1 = await upload.uploadPart(1, "1".repeat(PART_SIZE)); @@ -1481,8 +1474,8 @@ test("delete: waits for in-progress multipart gets before deleting part blobs", const objectBody1 = await r2.get("key"); const objectBody2 = await r2.get("key", { range: { offset: PART_SIZE } }); - const stmts = sqlStmts(storage.db); - const parts = stmts.getPartsByUploadId.all(upload.uploadId); + const stmts = sqlStmts(objectStub); + const parts = await stmts.getPartsByUploadId(upload.uploadId); t.is(parts.length, 3); await r2.delete("key"); t.is( @@ -1494,8 +1487,8 @@ test("delete: waits for in-progress multipart gets before deleting part blobs", `${"2".repeat(PART_SIZE)}${"3".repeat(PART_SIZE)}` ); - await timers.waitForTasks(); - for (const part of parts) t.is(await storage.blob.get(part.blob_id), null); + await objectStub.waitForFakeTasks(); + for (const part of parts) t.is(await objectStub.getBlob(part.blob_id), null); }); test("list: is multipart aware", async (t) => { const { r2, ns } = t.context; diff --git a/packages/miniflare/test/plugins/r2/validator.spec.ts b/packages/miniflare/test/plugins/r2/validator.spec.ts index da1e881b4..7ee5f19e1 100644 --- a/packages/miniflare/test/plugins/r2/validator.spec.ts +++ b/packages/miniflare/test/plugins/r2/validator.spec.ts @@ -1,170 +1,9 @@ import test from "ava"; -import { InternalR2Object, R2Conditional, _testR2Conditional } from "miniflare"; - -test("testR2Conditional: matches various conditions", (t) => { - // Adapted from internal R2 gateway tests - const etag = "test"; - const badEtag = "not-test"; - - const uploadedDate = new Date("2023-02-24T00:09:00.500Z"); - const pastDate = new Date(uploadedDate.getTime() - 30_000); - const futureDate = new Date(uploadedDate.getTime() + 30_000); - - const metadata: Pick = { - etag, - uploaded: uploadedDate.getTime(), - }; - - const using = (cond: R2Conditional) => _testR2Conditional(cond, metadata); - const usingMissing = (cond: R2Conditional) => _testR2Conditional(cond); - - // Check single conditions - t.true(using({ etagMatches: [{ type: "strong", value: etag }] })); - t.false(using({ etagMatches: [{ type: "strong", value: badEtag }] })); - - t.true(using({ etagDoesNotMatch: [{ type: "strong", value: badEtag }] })); - t.false(using({ etagDoesNotMatch: [{ type: "strong", value: etag }] })); - - t.false(using({ uploadedBefore: pastDate })); - t.true(using({ uploadedBefore: futureDate })); - - t.true(using({ uploadedAfter: pastDate })); - t.false(using({ uploadedBefore: pastDate })); - - // Check with weaker etags - t.false(using({ etagMatches: [{ type: "weak", value: etag }] })); - t.false(using({ etagDoesNotMatch: [{ type: "weak", value: etag }] })); - t.true(using({ etagDoesNotMatch: [{ type: "weak", value: badEtag }] })); - t.true(using({ etagMatches: [{ type: "wildcard" }] })); - t.false(using({ etagDoesNotMatch: [{ type: "wildcard" }] })); - - // Check multiple conditions that evaluate to false - t.false( - using({ - etagMatches: [{ type: "strong", value: etag }], - etagDoesNotMatch: [{ type: "strong", value: etag }], - }) - ); - t.false( - using({ - etagMatches: [{ type: "strong", value: etag }], - uploadedAfter: futureDate, - }) - ); - t.false( - using({ - // `etagMatches` pass makes `uploadedBefore` pass, but `uploadedAfter` fails - etagMatches: [{ type: "strong", value: etag }], - uploadedAfter: futureDate, - uploadedBefore: pastDate, - }) - ); - t.false( - using({ - etagDoesNotMatch: [{ type: "strong", value: badEtag }], - uploadedBefore: pastDate, - }) - ); - t.false( - using({ - // `etagDoesNotMatch` pass makes `uploadedAfter` pass, but `uploadedBefore` fails - etagDoesNotMatch: [{ type: "strong", value: badEtag }], - uploadedAfter: futureDate, - uploadedBefore: pastDate, - }) - ); - t.false( - using({ - etagMatches: [{ type: "strong", value: badEtag }], - etagDoesNotMatch: [{ type: "strong", value: badEtag }], - uploadedAfter: pastDate, - uploadedBefore: futureDate, - }) - ); - - // Check multiple conditions that evaluate to true - t.true( - using({ - etagMatches: [{ type: "strong", value: etag }], - etagDoesNotMatch: [{ type: "strong", value: badEtag }], - }) - ); - // `etagMatches` pass makes `uploadedBefore` pass - t.true( - using({ - etagMatches: [{ type: "strong", value: etag }], - uploadedBefore: pastDate, - }) - ); - // `etagDoesNotMatch` pass makes `uploadedAfter` pass - t.true( - using({ - etagDoesNotMatch: [{ type: "strong", value: badEtag }], - uploadedAfter: futureDate, - }) - ); - t.true( - using({ - // `etagMatches` pass makes `uploadedBefore` pass - etagMatches: [{ type: "strong", value: etag }], - uploadedBefore: pastDate, - // `etagDoesNotMatch` pass makes `uploadedAfter` pass - etagDoesNotMatch: [{ type: "strong", value: badEtag }], - uploadedAfter: futureDate, - }) - ); - t.true( - using({ - uploadedBefore: futureDate, - // `etagDoesNotMatch` pass makes `uploadedAfter` pass - etagDoesNotMatch: [{ type: "strong", value: badEtag }], - uploadedAfter: futureDate, - }) - ); - t.true( - using({ - uploadedAfter: pastDate, - // `etagMatches` pass makes `uploadedBefore` pass - etagMatches: [{ type: "strong", value: etag }], - uploadedBefore: pastDate, - }) - ); - - // Check missing metadata fails with either `etagMatches` and `uploadedAfter` - t.false(usingMissing({ etagMatches: [{ type: "strong", value: etag }] })); - t.false(usingMissing({ uploadedAfter: pastDate })); - t.false( - usingMissing({ - etagMatches: [{ type: "strong", value: etag }], - uploadedAfter: pastDate, - }) - ); - t.true(usingMissing({ etagDoesNotMatch: [{ type: "strong", value: etag }] })); - t.true(usingMissing({ uploadedBefore: pastDate })); - t.true( - usingMissing({ - etagDoesNotMatch: [{ type: "strong", value: etag }], - uploadedBefore: pastDate, - }) - ); - t.false( - usingMissing({ - etagMatches: [{ type: "strong", value: etag }], - uploadedBefore: pastDate, - }) - ); - t.false( - usingMissing({ - etagDoesNotMatch: [{ type: "strong", value: etag }], - uploadedAfter: pastDate, - }) - ); - - // Check with second granularity - const justPastDate = new Date(uploadedDate.getTime() - 250); - const justFutureDate = new Date(uploadedDate.getTime() + 250); - t.true(using({ uploadedAfter: justPastDate })); - t.false(using({ uploadedAfter: justPastDate, secondsGranularity: true })); - t.true(using({ uploadedBefore: justFutureDate })); - t.false(using({ uploadedBefore: justFutureDate, secondsGranularity: true })); -}); +import { workerTestMacro } from "../../test-shared"; + +test( + "testR2Conditional: matches various conditions", + workerTestMacro, + "r2", + "validator.ts" +);