Skip to content

Commit

Permalink
feat(queue): add clean method [python] (#2194)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Sep 24, 2023
1 parent c948c8a commit 3b67193
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
11 changes: 11 additions & 0 deletions python/bullmq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,17 @@ async def getJobCounts(self, *types):
counts[current_types[index]] = val or 0
return counts

async def clean(self, grace: int, limit: int, type: str):
"""
Cleans jobs from a queue. Similar to drain but keeps jobs within a certain
grace period
* @returns: Id jobs from the deleted records
"""
jobs = await self.scripts.cleanJobsInSet(type, grace, limit)

return jobs

def getJobState(self, job_id: str):
return self.scripts.getState(job_id)

Expand Down
12 changes: 12 additions & 0 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
self.commands = {
"addJob": self.redisClient.register_script(self.getScript("addJob-9.lua")),
"changePriority": self.redisClient.register_script(self.getScript("changePriority-5.lua")),
"cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-2.lua")),
"extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")),
"getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")),
"getRanges": self.redisClient.register_script(self.getScript("getRanges-1.lua")),
Expand Down Expand Up @@ -116,6 +117,17 @@ def addParentJob(self, job: Job, waiting_children_key: str, pipe = None):

return self.commands["addJob"](keys=keys, args=args, client=pipe)

def cleanJobsInSetArgs(self, set: str, grace: int, limit:int = 0):
keys = [self.toKey(set),
self.keys['events']]
args = [self.keys[''], round(time.time() * 1000) - grace, limit, set]

return (keys, args)

def cleanJobsInSet(self, set: str, grace: int = 0, limit:int = 0):
keys, args = self.cleanJobsInSetArgs(set, grace, limit)
return self.commands["cleanJobsInSet"](keys=keys, args=args)

def moveToWaitingChildrenArgs(self, job_id, token, opts):
keys = [self.toKey(job_id) + ":lock",
self.keys['active'],
Expand Down

0 comments on commit 3b67193

Please sign in to comment.