Skip to content

Commit

Permalink
simplified clenaup by deleting both job data and registry entry
Browse files Browse the repository at this point in the history
  • Loading branch information
Omer Lachish committed Nov 6, 2019
1 parent b552095 commit 02555b9
Showing 1 changed file with 7 additions and 23 deletions.
30 changes: 7 additions & 23 deletions redash/tasks/general.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import requests

from flask_mail import Message
from rq import Connection, Queue
from rq.registry import FailedJobRegistry
from rq.job import Job
from redash import mail, models, settings, rq_redis_connection
from redash.models import users
from redash.version_check import run_version_check
Expand Down Expand Up @@ -65,30 +64,15 @@ def sync_user_details():


def purge_failed_jobs():
jobs = rq_redis_connection.scan_iter('rq:job:*')
jobs = rq_redis_connection.scan_iter('{}*'.format(Job.redis_job_namespace_prefix))

is_idle = lambda key: rq_redis_connection.object('idletime', key) > settings.JOB_DEFAULT_FAILURE_TTL
has_failed = lambda key: rq_redis_connection.hget(key, 'status') == b'failed'
stale_job_keys = [key.decode().split(':').pop() for key in jobs if is_idle(key) and has_failed(key)]
stale_jobs = Job.fetch_many(stale_job_keys, rq_redis_connection)

def not_in_any_failed_registry(key):
""" This function should reject any key which is inside any FailedJobRegistry.
However, at the moment on RQ v1.1 the @job decorator does not allow setting of a failure_ttl, so jobs
are kept inside the FailedJobRegistry for a year and there is no easy way tweak that.
This has already been fixed on rq/master (https://github.com/rq/rq/pull/1130) and will be available on the
next release. Until then, we simply don't reject keys in a FailedJobRegistry and purge any failed jobs
that have been idle for over settings.JOB_DEFAULT_FAILURE_TTL.
Once a new RQ release is available, we can delete this comment and the following line:"""
return True

with Connection(rq_redis_connection):
failed_registries = [FailedJobRegistry(queue=q) for q in Queue.all()]

job_id = lambda key: key.decode().split(':').pop()
return all([job_id(key) not in registry for registry in failed_registries])

stale_jobs = [key for key in jobs if is_idle(key) and has_failed(key) and not_in_any_failed_registry(key)]

for key in stale_jobs:
rq_redis_connection.delete(key)
for job in stale_jobs:
job.delete()
rq_redis_connection.delete(job.key)

logger.info('Purged %d old failed jobs.', len(stale_jobs))

0 comments on commit 02555b9

Please sign in to comment.