From c361e337e91225cb61f62715f0844a950876f261 Mon Sep 17 00:00:00 2001 From: Jonas Jongejan Date: Mon, 6 Nov 2023 20:25:03 -0500 Subject: [PATCH 1/3] fix(python backoffs lookup bug): fix bug when using default backoff type When a builtin backoff type is used, the lookup was expecting and object with a property type, but the passed value from the job is a dict. This fixes the lookup and adds a test for it. --- python/bullmq/backoffs.py | 4 ++-- python/tests/worker_tests.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/python/bullmq/backoffs.py b/python/bullmq/backoffs.py index 59ea080eb0..fedaf15cb7 100644 --- a/python/bullmq/backoffs.py +++ b/python/bullmq/backoffs.py @@ -31,8 +31,8 @@ async def calculate(backoff: BackoffOptions, attempts_made: int, err, job, custo def lookup_strategy(backoff: BackoffOptions, custom_strategy): backoff_type = backoff.get("type") if backoff_type in Backoffs.builtin_strategies: - Backoffs.builtin_strategies[backoff.type](backoff.delay) + Backoffs.builtin_strategies[backoff_type](backoff.delay) elif custom_strategy: return custom_strategy else: - raise Exception(f"Unknown backoff strategy {backoff_type}.If a custom backoff strategy is used, specify it when the queue is created.") + raise Exception(f"Unknown backoff strategy {backoff_type}. If a custom backoff strategy is used, specify it when the queue is created.") diff --git a/python/tests/worker_tests.py b/python/tests/worker_tests.py index 31a5c149f8..be0d14065e 100644 --- a/python/tests/worker_tests.py +++ b/python/tests/worker_tests.py @@ -209,6 +209,34 @@ async def process2(job: Job, token: str): await worker2.close(force=True) await queue.close() + async def test_retry_job_after_delay_with_fixed_backoff(self): + queue = Queue(queueName) + + async def process1(job: Job, token: str): + if job.attemptsMade < 3: + raise Exception("Not yet!") + return None + + worker = Worker(queueName, process1) + + start = round(time.time() * 1000) + await queue.add("test", { "foo": "bar" }, + {"attempts": 3, "backoff": {"type": "fixed", "delay": 3000}}) + + completed_events = Future() + + def completing(job: Job, result): + elapse = round(time.time() * 1000) - start + self.assertGreater(elapse, 3000) + completed_events.set_result(None) + + worker.on("completed", completing) + + await completed_events + + await queue.close() + await worker.close() + async def test_retry_job_after_delay_with_custom_backoff(self): queue = Queue(queueName) From 76ea02e6e92ec3e115588c8d20db1ab8ec9701ec Mon Sep 17 00:00:00 2001 From: Jonas Jongejan Date: Tue, 7 Nov 2023 10:43:49 -0500 Subject: [PATCH 2/3] fix(backoff): lookup delay using .get Delay lookup from backoff was done using .delay, but backoff is a dict so needs to be done using .get --- python/bullmq/backoffs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/bullmq/backoffs.py b/python/bullmq/backoffs.py index fedaf15cb7..700627263c 100644 --- a/python/bullmq/backoffs.py +++ b/python/bullmq/backoffs.py @@ -31,7 +31,7 @@ async def calculate(backoff: BackoffOptions, attempts_made: int, err, job, custo def lookup_strategy(backoff: BackoffOptions, custom_strategy): backoff_type = backoff.get("type") if backoff_type in Backoffs.builtin_strategies: - Backoffs.builtin_strategies[backoff_type](backoff.delay) + Backoffs.builtin_strategies[backoff_type](backoff.get("delay")) elif custom_strategy: return custom_strategy else: From c611fc75bd4785d504938b27045f5cb176f390be Mon Sep 17 00:00:00 2001 From: Jonas Jongejan Date: Tue, 7 Nov 2023 16:47:21 -0500 Subject: [PATCH 3/3] fix(python backoff): return backoff strategy in lookup The builtin strategy was not returned from lookup_strategy --- python/bullmq/backoffs.py | 3 +-- python/tests/worker_tests.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/python/bullmq/backoffs.py b/python/bullmq/backoffs.py index 700627263c..49e0ca432d 100644 --- a/python/bullmq/backoffs.py +++ b/python/bullmq/backoffs.py @@ -24,14 +24,13 @@ def normalize(backoff: int | BackoffOptions): async def calculate(backoff: BackoffOptions, attempts_made: int, err, job, customStrategy): if backoff: strategy = lookup_strategy(backoff, customStrategy) - return strategy(attempts_made, backoff.get("type"), err, job) def lookup_strategy(backoff: BackoffOptions, custom_strategy): backoff_type = backoff.get("type") if backoff_type in Backoffs.builtin_strategies: - Backoffs.builtin_strategies[backoff_type](backoff.get("delay")) + return Backoffs.builtin_strategies[backoff_type](backoff.get("delay")) elif custom_strategy: return custom_strategy else: diff --git a/python/tests/worker_tests.py b/python/tests/worker_tests.py index be0d14065e..61f2084bfd 100644 --- a/python/tests/worker_tests.py +++ b/python/tests/worker_tests.py @@ -221,13 +221,13 @@ async def process1(job: Job, token: str): start = round(time.time() * 1000) await queue.add("test", { "foo": "bar" }, - {"attempts": 3, "backoff": {"type": "fixed", "delay": 3000}}) + {"attempts": 3, "backoff": {"type": "fixed", "delay": 1000}}) completed_events = Future() def completing(job: Job, result): elapse = round(time.time() * 1000) - start - self.assertGreater(elapse, 3000) + self.assertGreater(elapse, 2000) completed_events.set_result(None) worker.on("completed", completing)