Skip to content

Commit

Permalink
fix(miniflare): Fix regression introduced in #5570
Browse files Browse the repository at this point in the history
  • Loading branch information
CarmenPopoviciu committed Sep 17, 2024
1 parent 0d26420 commit c95340e
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 4 deletions.
9 changes: 9 additions & 0 deletions .changeset/cool-beans-applaud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"miniflare": minor
---

fix: Fix Miniflare regression introduced in #5570

PR #5570 introduced a regression in Miniflare, namely that declaring Queue Producers like `queueProducers: { "MY_QUEUE": "my-queue" }` no longer works. This commit fixes the issue.

Fixes #5908
17 changes: 15 additions & 2 deletions packages/miniflare/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ function getQueueProducers(
if (workerProducers !== undefined) {
// De-sugar array consumer options to record mapping to empty options
if (Array.isArray(workerProducers)) {
// queueProducers: ["MY_QUEUE"]
workerProducers = Object.fromEntries(
workerProducers.map((bindingName) => [
bindingName,
Expand All @@ -428,8 +429,20 @@ function getQueueProducers(
);
}

for (const [bindingName, opts] of Object.entries(workerProducers)) {
queueProducers.set(bindingName, { workerName, ...opts });
type Entries<T> = { [K in keyof T]: [K, T[K]] }[keyof T][];
type ProducersIterable = Entries<typeof workerProducers>;
const producersIterable = Object.entries(
workerProducers
) as ProducersIterable;

for (const [bindingName, opts] of producersIterable) {
if (typeof opts === "string") {
// queueProducers: { "MY_QUEUE": "my-queue" }
queueProducers.set(bindingName, { workerName, queueName: opts });
} else {
// queueProducers: { QUEUE: { queueName: "QUEUE", ... } }
queueProducers.set(bindingName, { workerName, ...opts });
}
}
}
}
Expand Down
124 changes: 122 additions & 2 deletions packages/miniflare/test/plugins/queues/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,126 @@ test("flushes partial and full batches", async (t) => {
batches = [];
});

test("supports declaring queue producers as a key-value pair -> queueProducers: { 'MY_QUEUE_BINDING': 'my-queue_name' }", async (t) => {
const promise = new DeferredPromise<z.infer<typeof MessageArraySchema>>();
const mf = new Miniflare({
verbose: true,
queueProducers: { MY_QUEUE_PRODUCER: "MY_QUEUE" },
queueConsumers: ["MY_QUEUE"],
serviceBindings: {
async REPORTER(request) {
promise.resolve(MessageArraySchema.parse(await request.json()));
return new Response();
},
},
modules: true,
script: `export default {
async fetch(request, env, ctx) {
await env.MY_QUEUE_PRODUCER.send("Hello world!");
await env.MY_QUEUE_PRODUCER.sendBatch([{ body: "Hola mundo!" }]);
return new Response(null, { status: 204 });
},
async queue(batch, env, ctx) {
await env.REPORTER.fetch("http://localhost", {
method: "POST",
body: JSON.stringify(batch.messages.map(({ id, body, attempts }) => ({ queue: batch.queue, id, body, attempts }))),
});
}
}`,
});
t.teardown(() => mf.dispose());
const object = await getControlStub(mf, "MY_QUEUE");

await mf.dispatchFetch("http://localhost");
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
const batch = await promise;
t.deepEqual(batch, [
{ queue: "MY_QUEUE", id: batch[0].id, body: "Hello world!", attempts: 1 },
{ queue: "MY_QUEUE", id: batch[1].id, body: "Hola mundo!", attempts: 1 },
]);
});

test("supports declaring queue producers as an array -> queueProducers: ['MY_QUEUE_BINDING']", async (t) => {
const promise = new DeferredPromise<z.infer<typeof MessageArraySchema>>();
const mf = new Miniflare({
verbose: true,
queueProducers: ["MY_QUEUE"],
queueConsumers: ["MY_QUEUE"],
serviceBindings: {
async REPORTER(request) {
promise.resolve(MessageArraySchema.parse(await request.json()));
return new Response();
},
},
modules: true,
script: `export default {
async fetch(request, env, ctx) {
await env.MY_QUEUE.send("Hello World!");
await env.MY_QUEUE.sendBatch([{ body: "Hola Mundo!" }]);
return new Response(null, { status: 204 });
},
async queue(batch, env, ctx) {
await env.REPORTER.fetch("http://localhost", {
method: "POST",
body: JSON.stringify(batch.messages.map(({ id, body, attempts }) => ({ queue: batch.queue, id, body, attempts }))),
});
}
}`,
});
t.teardown(() => mf.dispose());
const object = await getControlStub(mf, "MY_QUEUE");

await mf.dispatchFetch("http://localhost");
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
const batch = await promise;
t.deepEqual(batch, [
{ queue: "MY_QUEUE", id: batch[0].id, body: "Hello World!", attempts: 1 },
{ queue: "MY_QUEUE", id: batch[1].id, body: "Hola Mundo!", attempts: 1 },
]);
});

test("supports declaring queue producers as {MY_QUEUE_BINDING: {queueName: 'my-queue-name'}}", async (t) => {
const promise = new DeferredPromise<z.infer<typeof MessageArraySchema>>();
const mf = new Miniflare({
verbose: true,
queueProducers: { MY_QUEUE_PRODUCER: { queueName: "MY_QUEUE" } },
queueConsumers: ["MY_QUEUE"],
serviceBindings: {
async REPORTER(request) {
promise.resolve(MessageArraySchema.parse(await request.json()));
return new Response();
},
},
modules: true,
script: `export default {
async fetch(request, env, ctx) {
await env.MY_QUEUE_PRODUCER.send("Hello World!");
await env.MY_QUEUE_PRODUCER.sendBatch([{ body: "Hola Mundo!" }]);
return new Response(null, { status: 204 });
},
async queue(batch, env, ctx) {
await env.REPORTER.fetch("http://localhost", {
method: "POST",
body: JSON.stringify(batch.messages.map(({ id, body, attempts }) => ({ queue: batch.queue, id, body, attempts }))),
});
}
}`,
});
t.teardown(() => mf.dispose());
const object = await getControlStub(mf, "MY_QUEUE");

await mf.dispatchFetch("http://localhost");
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
const batch = await promise;
t.deepEqual(batch, [
{ queue: "MY_QUEUE", id: batch[0].id, body: "Hello World!", attempts: 1 },
{ queue: "MY_QUEUE", id: batch[1].id, body: "Hola Mundo!", attempts: 1 },
]);
});

test("sends all structured cloneable types", async (t) => {
const errorPromise = new DeferredPromise<string>();

Expand Down Expand Up @@ -804,9 +924,9 @@ test("supports message contentTypes", async (t) => {
test("validates message size", async (t) => {
const mf = new Miniflare({
verbose: true,
queueProducers: ["QUEUE"],
queueProducers: {QUEUE: "MY_QUEUE"},
queueConsumers: {
QUEUE: {
MY_QUEUE: {
maxBatchSize: 100,
maxBatchTimeout: 0,
},
Expand Down

0 comments on commit c95340e

Please sign in to comment.