From 00cd0174539fbe1cc4628b9b6e1a7eb87a5ef705 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Tue, 9 Jul 2024 20:16:08 -0600 Subject: [PATCH] fix(parent): consider re-adding child that is in completed state using same jobIds (#2627) (python) fixes #2554 --- python/bullmq/scripts.py | 10 +-- python/bullmq/types/job_options.py | 2 +- python/bullmq/types/queue_options.py | 1 - python/bullmq/utils.py | 1 + python/bullmq/worker.py | 4 +- python/tests/bulk_tests.py | 1 + python/tests/job_tests.py | 12 ++-- python/tests/queue_tests.py | 10 +-- python/tests/worker_tests.py | 4 +- src/commands/includes/handleDuplicatedJob.lua | 2 +- tests/test_flow.ts | 72 +++++++++++++++++++ 11 files changed, 97 insertions(+), 22 deletions(-) diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py index 1c6406e82f..26218b109b 100644 --- a/python/bullmq/scripts.py +++ b/python/bullmq/scripts.py @@ -258,7 +258,7 @@ def retryJobArgs(self, job_id: str, lifo: bool, token: str, opts: dict = {}): push_cmd = "RPUSH" if lifo else "LPUSH" args = [self.keys[''], round(time.time() * 1000), push_cmd, - job_id, token, "1" if opts.get("skipAttempt") else "0"] + job_id, token, "1" if opts.get("skipAttempt") else "0"] return (keys, args) @@ -275,7 +275,7 @@ def moveToDelayedArgs(self, job_id: str, timestamp: int, token: str, delay: int keys.append(self.keys['stalled']) args = [self.keys[''], round(time.time() * 1000), str(max_timestamp), - job_id, token, delay, "1" if opts.get("skipAttempt") else "0"] + job_id, token, delay, "1" if opts.get("skipAttempt") else "0"] return (keys, args) @@ -325,9 +325,9 @@ def getCounts(self, types): def getCountsPerPriorityArgs(self, priorities): keys = [self.keys['wait'], - self.keys['paused'], - self.keys['meta'], - self.keys['prioritized']] + self.keys['paused'], + self.keys['meta'], + self.keys['prioritized']] args = priorities diff --git a/python/bullmq/types/job_options.py b/python/bullmq/types/job_options.py index 9cb617d477..f0d3fb930d 100644 --- a/python/bullmq/types/job_options.py +++ b/python/bullmq/types/job_options.py @@ -7,7 +7,7 @@ class JobOptions(TypedDict, total=False): """ Backoff setting for automatic retries if the job fails. """ - + jobId: str """ Override the job ID - by default, the job ID is a unique diff --git a/python/bullmq/types/queue_options.py b/python/bullmq/types/queue_options.py index 70ed1c92d4..19f21f9186 100644 --- a/python/bullmq/types/queue_options.py +++ b/python/bullmq/types/queue_options.py @@ -13,4 +13,3 @@ class QueueBaseOptions(TypedDict, total=False): """ Prefix for all queue keys. """ - diff --git a/python/bullmq/utils.py b/python/bullmq/utils.py index 70b00a288b..7d642c29d7 100644 --- a/python/bullmq/utils.py +++ b/python/bullmq/utils.py @@ -1,6 +1,7 @@ import semver import traceback + def isRedisVersionLowerThan(current_version, minimum_version): return semver.VersionInfo.parse(current_version).compare(minimum_version) == -1 diff --git a/python/bullmq/worker.py b/python/bullmq/worker.py index a0e33404cc..3b1f578c0f 100644 --- a/python/bullmq/worker.py +++ b/python/bullmq/worker.py @@ -16,9 +16,11 @@ maximum_block_timeout = 10 # 1 millisecond is chosen because the granularity of our timestamps are milliseconds. -# Obviously we can still process much faster than 1 job per millisecond but delays and rate limits will never work with more accuracy than 1ms. +# Obviously we can still process much faster than 1 job per millisecond but delays and +# rate limits will never work with more accuracy than 1ms. minimum_block_timeout = 0.001 + class Worker(EventEmitter): def __init__(self, name: str, processor: Callable[[Job, str], asyncio.Future], opts: WorkerOptions = {}): super().__init__() diff --git a/python/tests/bulk_tests.py b/python/tests/bulk_tests.py index 746707ed5d..cccb267b6b 100644 --- a/python/tests/bulk_tests.py +++ b/python/tests/bulk_tests.py @@ -39,6 +39,7 @@ async def process(job: Job, token: str): completed_events = Future() job_count = 1 + def completing(job: Job, result): nonlocal job_count if job_count == 2: diff --git a/python/tests/job_tests.py b/python/tests/job_tests.py index 290e222348..f3d370432a 100644 --- a/python/tests/job_tests.py +++ b/python/tests/job_tests.py @@ -27,7 +27,7 @@ async def test_set_and_get_progress_as_number(self): await job.updateProgress(42) stored_job = await Job.fromId(queue, job.id) self.assertEqual(stored_job.progress, 42) - + await queue.close() async def test_set_and_get_progress_as_object(self): @@ -36,7 +36,7 @@ async def test_set_and_get_progress_as_object(self): await job.updateProgress({"total": 120, "completed": 40}) stored_job = await Job.fromId(queue, job.id) self.assertEqual(stored_job.progress, {"total": 120, "completed": 40}) - + await queue.close() async def test_get_job_state(self): @@ -45,7 +45,7 @@ async def test_get_job_state(self): state = await job.getState() self.assertEqual(state, "waiting") - + await queue.close() async def test_job_log(self): @@ -67,9 +67,9 @@ async def test_update_job_data(self): job = await queue.add("test", {"foo": "bar"}, {}) await job.updateData({"baz": "qux"}) stored_job = await Job.fromId(queue, job.id) - + self.assertEqual(stored_job.data, {"baz": "qux"}) - + await queue.close() async def test_update_job_data_when_is_removed(self): @@ -78,7 +78,7 @@ async def test_update_job_data_when_is_removed(self): await job.remove() with self.assertRaises(TypeError): await job.updateData({"baz": "qux"}) - + await queue.close() async def test_promote_delayed_job(self): diff --git a/python/tests/queue_tests.py b/python/tests/queue_tests.py index e3d8b578c6..94bdb7da6a 100644 --- a/python/tests/queue_tests.py +++ b/python/tests/queue_tests.py @@ -38,7 +38,7 @@ async def test_get_jobs(self): job1 = await queue.add("test-job", {"foo": "bar"}, {}) job2 = await queue.add("test-job", {"foo": "bar"}, {}) jobs = await queue.getJobs(["wait"]) - + self.assertEqual(job2.id, jobs[0].id) self.assertEqual(job1.id, jobs[1].id) await queue.close() @@ -106,7 +106,7 @@ async def test_trim_events_manually(self): events_length = await queue.client.xlen(f"{queue.prefix}:{queueName}:events") self.assertEqual(events_length, 8) - await queue.trimEvents(0); + await queue.trimEvents(0) events_length = await queue.client.xlen(f"{queue.prefix}:{queue.name}:events") @@ -124,7 +124,7 @@ async def test_trim_events_manually_with_custom_prefix(self): events_length = await queue.client.xlen(f"test:{queueName}:events") self.assertEqual(events_length, 8) - await queue.trimEvents(0); + await queue.trimEvents(0) events_length = await queue.client.xlen(f"test:{queue.name}:events") @@ -374,13 +374,13 @@ async def test_promote_all_delayed_jobs(self): job_count = 8 for index in range(job_count): - data = {"idx": index} + data = { "idx": index } await queue.add("test", data=data, opts={ "delay": 5000 }) delayed_count = await queue.getJobCounts('delayed') self.assertEqual(delayed_count['delayed'], job_count) - await queue.promoteJobs(); + await queue.promoteJobs() waiting_count = await queue.getJobCounts('waiting') self.assertEqual(waiting_count['waiting'], job_count) diff --git a/python/tests/worker_tests.py b/python/tests/worker_tests.py index 011356a4d8..35f7f9dbf3 100644 --- a/python/tests/worker_tests.py +++ b/python/tests/worker_tests.py @@ -299,7 +299,7 @@ async def parent_process(job: Job, token: str): }) step = Step.Second elif step == Step.Second: - await queue.add('child-2', {"foo": "bar" },{ + await queue.add('child-2', { "foo": "bar" }, { "parent": { "id": job.id, "queue": job.queueQualifiedName @@ -364,7 +364,7 @@ async def test_process_job_respecting_the_concurrency_set(self): async def process(job: Job, token: str): nonlocal num_jobs_processing - nonlocal wait + nonlocal wait nonlocal pending_message_to_process num_jobs_processing += 1 self.assertLess(num_jobs_processing, 5) diff --git a/src/commands/includes/handleDuplicatedJob.lua b/src/commands/includes/handleDuplicatedJob.lua index 67cbb558d7..8c2f6bcbf3 100644 --- a/src/commands/includes/handleDuplicatedJob.lua +++ b/src/commands/includes/handleDuplicatedJob.lua @@ -9,7 +9,7 @@ local function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParen parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp) local existedParentKey = rcall("HGET", jobKey, "parentKey") - if not existedParentKey then + if not existedParentKey or existedParentKey == currentParentKey then updateExistingJobsParent(currentParentKey, currentParent, parentData, parentDependenciesKey, completedKey, jobKey, jobId, timestamp) diff --git a/tests/test_flow.ts b/tests/test_flow.ts index 0da0b1e7f3..c976889128 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -1671,6 +1671,78 @@ describe('flows', () => { await flow.close(); }); }); + + describe('when child already existed and it is re-added with same parentId', async () => { + it('moves parent to wait if child is already completed', async () => { + const worker = new Worker( + queueName, + async () => { + await new Promise(s => { + setTimeout(s, 250); + }); + }, + { + connection, + prefix, + }, + ); + + const completing = new Promise(resolve => { + worker.on('completed', (job: Job) => { + if (job.id === 'tue') { + resolve(); + } + }); + }); + + const flow = new FlowProducer({ connection, prefix }); + + await flow.add({ + queueName, + name: 'tue', + opts: { + jobId: 'tue', + removeOnComplete: true, + }, + children: [ + { + name: 'mon', + queueName, + opts: { + jobId: 'mon', + }, + }, + ], + }); + + await completing; + + const tree = await flow.add({ + queueName, + name: 'tue', + opts: { + jobId: 'tue', + }, + children: [ + { + name: 'mon', + queueName, + opts: { + jobId: 'mon', + }, + }, + ], + }); + + await delay(1000); + const state = await tree.job.getState(); + + expect(state).to.be.equal('completed'); + + await worker.close(); + await flow.close(); + }); + }); }); describe('when custom prefix is set in flow producer', async () => {