Skip to content

Commit

Permalink
fix(move-to-finished): stringify any return value [python] (#2198) fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Sep 26, 2023
1 parent 3b67193 commit 07f1335
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 ./python --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Release Python
if: ${{ contains(env.commitmsg, '(python)') }}
if: ${{ contains(env.commitmsg, 'python') }}
run: |
cd python
pip install packaging
Expand Down
2 changes: 1 addition & 1 deletion python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ async def updateProgress(self, job_id: str, progress):
return None

def moveToFinishedArgs(self, job: Job, val: Any, propVal: str, shouldRemove, target, token: str, opts: dict, fetchNext=True) -> list[Any] | None:
transformed_value = json.dumps(val, separators=(',', ':')) if type(val) == dict else val
transformed_value = json.dumps(val, separators=(',', ':'))
timestamp = round(time.time() * 1000)
metricsKey = self.toKey('metrics:' + target)

Expand Down
56 changes: 55 additions & 1 deletion python/tests/worker_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,60 @@ async def process(job: Job, token: str):
await worker.close(force=True)
await queue.close()

async def test_process_job_with_array_as_return_value(self):
queue = Queue(queueName)
data = {"foo": "bar"}
job = await queue.add("test-job", data, {"removeOnComplete": False})

async def process(job: Job, token: str):
print("Processing job", job)
return ['foo']

worker = Worker(queueName, process)

processing = Future()
worker.on("completed", lambda job, result: processing.set_result(None))

await processing

completedJob = await Job.fromId(queue, job.id)

self.assertEqual(completedJob.id, job.id)
self.assertEqual(completedJob.attemptsMade, 1)
self.assertEqual(completedJob.data, data)
self.assertEqual(completedJob.returnvalue, ['foo'])
self.assertNotEqual(completedJob.finishedOn, None)

await worker.close(force=True)
await queue.close()

async def test_process_job_with_boolean_as_return_value(self):
queue = Queue(queueName)
data = {"foo": "bar"}
job = await queue.add("test-job", data, {"removeOnComplete": False})

async def process(job: Job, token: str):
print("Processing job", job)
return True

worker = Worker(queueName, process)

processing = Future()
worker.on("completed", lambda job, result: processing.set_result(None))

await processing

completedJob = await Job.fromId(queue, job.id)

self.assertEqual(completedJob.id, job.id)
self.assertEqual(completedJob.attemptsMade, 1)
self.assertEqual(completedJob.data, data)
self.assertEqual(completedJob.returnvalue, True)
self.assertNotEqual(completedJob.finishedOn, None)

await worker.close(force=True)
await queue.close()

async def test_process_jobs_fail(self):
queue = Queue(queueName)
data = {"foo": "bar"}
Expand All @@ -75,7 +129,7 @@ async def process(job: Job, token: str):
self.assertEqual(failedJob.id, job.id)
self.assertEqual(failedJob.attemptsMade, 1)
self.assertEqual(failedJob.data, data)
self.assertEqual(failedJob.failedReason, failedReason)
self.assertEqual(failedJob.failedReason, f'"{failedReason}"')
self.assertEqual(len(failedJob.stacktrace), 1)
self.assertEqual(failedJob.returnvalue, None)
self.assertNotEqual(failedJob.finishedOn, None)
Expand Down

0 comments on commit 07f1335

Please sign in to comment.