diff --git a/python/bullmq/job.py b/python/bullmq/job.py index fe0b8837a4..7d69101937 100644 --- a/python/bullmq/job.py +++ b/python/bullmq/job.py @@ -5,7 +5,7 @@ if TYPE_CHECKING: from bullmq.queue import Queue from bullmq.types import JobOptions -from bullmq.utils import get_parent_key +from bullmq.utils import get_parent_key, parse_json_string_values import json import time @@ -210,6 +210,10 @@ async def saveStacktrace(self, pipe, err:str): def moveToWaitingChildren(self, token, opts:dict): return self.scripts.moveToWaitingChildren(self.id, token, opts) + async def getChildrenValues(self): + results = await self.queue.client.hgetall(f"{self.queue.prefix}:{self.queue.name}:{self.id}:processed") + return parse_json_string_values(results) + @staticmethod def fromJSON(queue: Queue, rawData: dict, jobId: str | None = None): """ diff --git a/python/bullmq/utils.py b/python/bullmq/utils.py index 7d642c29d7..0c58457831 100644 --- a/python/bullmq/utils.py +++ b/python/bullmq/utils.py @@ -1,5 +1,6 @@ import semver import traceback +import json def isRedisVersionLowerThan(current_version, minimum_version): @@ -18,3 +19,6 @@ def extract_result(job_task, emit_callback): def get_parent_key(opts: dict): if opts: return f"{opts.get('queue')}:{opts.get('id')}" + +def parse_json_string_values(input_dict: dict[str, str]) -> dict[str, dict]: + return {key: json.loads(value) for key, value in input_dict.items()} \ No newline at end of file diff --git a/python/tests/flow_tests.py b/python/tests/flow_tests.py index a835453509..7f0a9a6428 100644 --- a/python/tests/flow_tests.py +++ b/python/tests/flow_tests.py @@ -140,5 +140,75 @@ async def process2(job: Job, token: str): await parent_queue.obliterate() await parent_queue.close() + async def test_get_children_values(self): + child_job_name = 'child-job' + children_data = [ + {"bar": None}, + {"baz": 12.93}, + {"qux": "string value"} + ] + parent_queue_name = f"__test_parent_queue__{uuid4().hex}" + + processing_children = Future() + + processed_children = 0 + async def process1(job: Job, token: str): + nonlocal processed_children + processed_children+=1 + if processed_children == len(children_data): + processing_children.set_result(None) + return children_data[job.data.get("idx")] + + processing_parent = Future() + + async def process2(job: Job, token: str): + children_values = await job.getChildrenValues() + processing_parent.set_result(children_values) + return 1 + + parent_worker = Worker(parent_queue_name, process2) + children_worker = Worker(queue_name, process1) + + flow = FlowProducer() + await flow.add( + { + "name": 'parent-job', + "queueName": parent_queue_name, + "data": {}, + "children": [ + {"name": child_job_name, "data": {"idx": 0, "foo": 'bar'}, "queueName": queue_name}, + {"name": child_job_name, "data": {"idx": 1, "foo": 'baz'}, "queueName": queue_name}, + {"name": child_job_name, "data": {"idx": 2, "foo": 'qux'}, "queueName": queue_name} + ] + } + ) + + await processing_children + await processing_parent + + def on_parent_processed(future): + self.assertIn(children_data[0], future.result().values()) + self.assertIn(children_data[1], future.result().values()) + self.assertIn(children_data[2], future.result().values()) + + processing_parent.add_done_callback(on_parent_processed) + + await parent_worker.close() + await children_worker.close() + await flow.close() + + parent_queue = Queue(parent_queue_name) + await parent_queue.pause() + await parent_queue.obliterate() + await parent_queue.close() + + async def test_get_children_values_on_simple_jobs(self): + queue = Queue(queue_name) + job = await queue.add("test", {"foo": "bar"}, {"delay": 1500}) + children_values = await job.getChildrenValues() + self.assertEqual(children_values, {}) + + await queue.close() + if __name__ == '__main__': unittest.main()