diff --git a/python/bullmq/backoffs.py b/python/bullmq/backoffs.py index 59ea080eb0..49e0ca432d 100644 --- a/python/bullmq/backoffs.py +++ b/python/bullmq/backoffs.py @@ -24,15 +24,14 @@ def normalize(backoff: int | BackoffOptions): async def calculate(backoff: BackoffOptions, attempts_made: int, err, job, customStrategy): if backoff: strategy = lookup_strategy(backoff, customStrategy) - return strategy(attempts_made, backoff.get("type"), err, job) def lookup_strategy(backoff: BackoffOptions, custom_strategy): backoff_type = backoff.get("type") if backoff_type in Backoffs.builtin_strategies: - Backoffs.builtin_strategies[backoff.type](backoff.delay) + return Backoffs.builtin_strategies[backoff_type](backoff.get("delay")) elif custom_strategy: return custom_strategy else: - raise Exception(f"Unknown backoff strategy {backoff_type}.If a custom backoff strategy is used, specify it when the queue is created.") + raise Exception(f"Unknown backoff strategy {backoff_type}. If a custom backoff strategy is used, specify it when the queue is created.") diff --git a/python/tests/worker_tests.py b/python/tests/worker_tests.py index 31a5c149f8..61f2084bfd 100644 --- a/python/tests/worker_tests.py +++ b/python/tests/worker_tests.py @@ -209,6 +209,34 @@ async def process2(job: Job, token: str): await worker2.close(force=True) await queue.close() + async def test_retry_job_after_delay_with_fixed_backoff(self): + queue = Queue(queueName) + + async def process1(job: Job, token: str): + if job.attemptsMade < 3: + raise Exception("Not yet!") + return None + + worker = Worker(queueName, process1) + + start = round(time.time() * 1000) + await queue.add("test", { "foo": "bar" }, + {"attempts": 3, "backoff": {"type": "fixed", "delay": 1000}}) + + completed_events = Future() + + def completing(job: Job, result): + elapse = round(time.time() * 1000) - start + self.assertGreater(elapse, 2000) + completed_events.set_result(None) + + worker.on("completed", completing) + + await completed_events + + await queue.close() + await worker.close() + async def test_retry_job_after_delay_with_custom_backoff(self): queue = Queue(queueName)