Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue consumer setting max_batch_timeout, batch-timeout should accept 0 #5859

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/old-weeks-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"wrangler": patch
---

fix: queue consumer max_batch_timeout should accept a 0 value
64 changes: 64 additions & 0 deletions packages/wrangler/src/__tests__/deploy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9734,6 +9734,70 @@ export default{
`);
});

it("should support queue consumer with max_batch_timeout of 0", async () => {
writeWranglerToml({
queues: {
consumers: [
{
queue: queueName,
dead_letter_queue: "myDLQ",
max_batch_size: 5,
max_batch_timeout: 0,
max_retries: 10,
max_concurrency: null,
},
],
},
});
await fs.promises.writeFile("index.js", `export default {};`);
mockSubDomainRequest();
mockUploadWorkerRequest();

const consumerId = "consumer-id";
const existingQueue: QueueResponse = {
queue_id: queueId,
queue_name: queueName,
created_on: "",
producers: [],
consumers: [
{
type: "worker",
script: "test-name",
consumer_id: consumerId,
settings: {},
},
],
producers_total_count: 0,
consumers_total_count: 0,
modified_on: "",
};
mockGetQueueByName(queueName, existingQueue);
mockPutQueueConsumerById(queueId, queueName, consumerId, {
dead_letter_queue: "myDLQ",
type: "worker",
script_name: "test-name",
settings: {
batch_size: 5,
max_retries: 10,
max_wait_time_ms: 0,
},
});

await runWrangler("deploy index.js");
expect(std.out).toMatchInlineSnapshot(`
"Total Upload: xx KiB / gzip: xx KiB
Uploaded test-name (TIMINGS)
Published test-name (TIMINGS)
https://test-name.test-sub-domain.workers.dev
Consumer for queue1
Current Deployment ID: Galaxy-Class
Current Version ID: Galaxy-Class


Note: Deployment ID has been renamed to Version ID. Deployment ID is present to maintain compatibility with the previous behavior of this command. This output will change in a future version of Wrangler. To learn more visit: https://developers.cloudflare.com/workers/configuration/versions-and-deployments"
`);
});

it("consumer should error when a queue doesn't exist", async () => {
writeWranglerToml({
queues: {
Expand Down
43 changes: 43 additions & 0 deletions packages/wrangler/src/__tests__/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,49 @@ describe("wrangler", () => {
`);
});

it("should add a consumer with batchTimeout of 0", async () => {
const queueNameResolveRequest = mockGetQueueByNameRequest(
expectedQueueName,
{
queue_id: expectedQueueId,
queue_name: expectedQueueName,
created_on: "",
producers: [],
consumers: [],
producers_total_count: 1,
consumers_total_count: 0,
modified_on: "",
}
);

const expectedBody: PostTypedConsumerBody = {
script_name: "testScript",
type: "worker",
environment_name: "myEnv",
settings: {
batch_size: 20,
max_retries: 3,
max_wait_time_ms: 0,
max_concurrency: 3,
retry_delay: 10,
},
dead_letter_queue: "myDLQ",
};
const postRequest = mockPostRequest(expectedQueueId, expectedBody);

await runWrangler(
"queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 0 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay-secs=10"
);

expect(queueNameResolveRequest.count).toEqual(1);
expect(postRequest.count).toEqual(1);

expect(std.out).toMatchInlineSnapshot(`
"Adding consumer to queue testQueue.
Added consumer to queue testQueue."
`);
});

it("should show an error when two retry delays are set", async () => {
const expectedBody: PostTypedConsumerBody = {
script_name: "testScript",
Expand Down
7 changes: 4 additions & 3 deletions packages/wrangler/src/deploy/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1098,9 +1098,10 @@ export async function updateQueueConsumers(
settings: {
batch_size: consumer.max_batch_size,
max_retries: consumer.max_retries,
max_wait_time_ms: consumer.max_batch_timeout
? 1000 * consumer.max_batch_timeout
: undefined,
max_wait_time_ms:
consumer.max_batch_timeout !== undefined
? 1000 * consumer.max_batch_timeout
: undefined,
max_concurrency: consumer.max_concurrency,
retry_delay: consumer.retry_delay,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ function createBody(
settings: {
batch_size: args.batchSize,
max_retries: args.messageRetries,
max_wait_time_ms: args.batchTimeout // API expects milliseconds
? 1000 * args.batchTimeout
: undefined,
max_wait_time_ms:
args.batchTimeout !== undefined // API expects milliseconds
? 1000 * args.batchTimeout
: undefined,
max_concurrency: args.maxConcurrency,
retry_delay: args.retryDelaySecs,
},
Expand Down
Loading