Skip to content

Commit

Permalink
queue consumer setting max_batch_timeout, batch-timeout should accept…
Browse files Browse the repository at this point in the history
… 0 (#5859)

* Queue consumer max_batch_timeout should accept a zero value

* Queue consumer create should accept batchTimeout value of 0

* Add changeset for Queues bugfix

* Prettier formatting fixes
  • Loading branch information
w-kuhn authored May 20, 2024
1 parent 6725a19 commit f2ceb3a
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 6 deletions.
5 changes: 5 additions & 0 deletions .changeset/old-weeks-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"wrangler": patch
---

fix: queue consumer max_batch_timeout should accept a 0 value
64 changes: 64 additions & 0 deletions packages/wrangler/src/__tests__/deploy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9734,6 +9734,70 @@ export default{
`);
});

it("should support queue consumer with max_batch_timeout of 0", async () => {
writeWranglerToml({
queues: {
consumers: [
{
queue: queueName,
dead_letter_queue: "myDLQ",
max_batch_size: 5,
max_batch_timeout: 0,
max_retries: 10,
max_concurrency: null,
},
],
},
});
await fs.promises.writeFile("index.js", `export default {};`);
mockSubDomainRequest();
mockUploadWorkerRequest();

const consumerId = "consumer-id";
const existingQueue: QueueResponse = {
queue_id: queueId,
queue_name: queueName,
created_on: "",
producers: [],
consumers: [
{
type: "worker",
script: "test-name",
consumer_id: consumerId,
settings: {},
},
],
producers_total_count: 0,
consumers_total_count: 0,
modified_on: "",
};
mockGetQueueByName(queueName, existingQueue);
mockPutQueueConsumerById(queueId, queueName, consumerId, {
dead_letter_queue: "myDLQ",
type: "worker",
script_name: "test-name",
settings: {
batch_size: 5,
max_retries: 10,
max_wait_time_ms: 0,
},
});

await runWrangler("deploy index.js");
expect(std.out).toMatchInlineSnapshot(`
"Total Upload: xx KiB / gzip: xx KiB
Uploaded test-name (TIMINGS)
Published test-name (TIMINGS)
https://test-name.test-sub-domain.workers.dev
Consumer for queue1
Current Deployment ID: Galaxy-Class
Current Version ID: Galaxy-Class
Note: Deployment ID has been renamed to Version ID. Deployment ID is present to maintain compatibility with the previous behavior of this command. This output will change in a future version of Wrangler. To learn more visit: https://developers.cloudflare.com/workers/configuration/versions-and-deployments"
`);
});

it("consumer should error when a queue doesn't exist", async () => {
writeWranglerToml({
queues: {
Expand Down
43 changes: 43 additions & 0 deletions packages/wrangler/src/__tests__/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,49 @@ describe("wrangler", () => {
`);
});

it("should add a consumer with batchTimeout of 0", async () => {
const queueNameResolveRequest = mockGetQueueByNameRequest(
expectedQueueName,
{
queue_id: expectedQueueId,
queue_name: expectedQueueName,
created_on: "",
producers: [],
consumers: [],
producers_total_count: 1,
consumers_total_count: 0,
modified_on: "",
}
);

const expectedBody: PostTypedConsumerBody = {
script_name: "testScript",
type: "worker",
environment_name: "myEnv",
settings: {
batch_size: 20,
max_retries: 3,
max_wait_time_ms: 0,
max_concurrency: 3,
retry_delay: 10,
},
dead_letter_queue: "myDLQ",
};
const postRequest = mockPostRequest(expectedQueueId, expectedBody);

await runWrangler(
"queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 0 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay-secs=10"
);

expect(queueNameResolveRequest.count).toEqual(1);
expect(postRequest.count).toEqual(1);

expect(std.out).toMatchInlineSnapshot(`
"Adding consumer to queue testQueue.
Added consumer to queue testQueue."
`);
});

it("should show an error when two retry delays are set", async () => {
const expectedBody: PostTypedConsumerBody = {
script_name: "testScript",
Expand Down
7 changes: 4 additions & 3 deletions packages/wrangler/src/deploy/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1098,9 +1098,10 @@ export async function updateQueueConsumers(
settings: {
batch_size: consumer.max_batch_size,
max_retries: consumer.max_retries,
max_wait_time_ms: consumer.max_batch_timeout
? 1000 * consumer.max_batch_timeout
: undefined,
max_wait_time_ms:
consumer.max_batch_timeout !== undefined
? 1000 * consumer.max_batch_timeout
: undefined,
max_concurrency: consumer.max_concurrency,
retry_delay: consumer.retry_delay,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ function createBody(
settings: {
batch_size: args.batchSize,
max_retries: args.messageRetries,
max_wait_time_ms: args.batchTimeout // API expects milliseconds
? 1000 * args.batchTimeout
: undefined,
max_wait_time_ms:
args.batchTimeout !== undefined // API expects milliseconds
? 1000 * args.batchTimeout
: undefined,
max_concurrency: args.maxConcurrency,
retry_delay: args.retryDelaySecs,
},
Expand Down

0 comments on commit f2ceb3a

Please sign in to comment.