diff --git a/redash/tasks/general.py b/redash/tasks/general.py index e0090dfc17..7d2ab0d6bf 100644 --- a/redash/tasks/general.py +++ b/redash/tasks/general.py @@ -1,8 +1,7 @@ import requests from flask_mail import Message -from rq import Connection, Queue -from rq.registry import FailedJobRegistry +from rq.job import Job from redash import mail, models, settings, rq_redis_connection from redash.models import users from redash.version_check import run_version_check @@ -65,30 +64,15 @@ def sync_user_details(): def purge_failed_jobs(): - jobs = rq_redis_connection.scan_iter('rq:job:*') + jobs = rq_redis_connection.scan_iter('{}*'.format(Job.redis_job_namespace_prefix)) is_idle = lambda key: rq_redis_connection.object('idletime', key) > settings.JOB_DEFAULT_FAILURE_TTL has_failed = lambda key: rq_redis_connection.hget(key, 'status') == b'failed' + stale_job_keys = [key.decode().split(':').pop() for key in jobs if is_idle(key) and has_failed(key)] + stale_jobs = Job.fetch_many(stale_job_keys, rq_redis_connection) - def not_in_any_failed_registry(key): - """ This function should reject any key which is inside any FailedJobRegistry. - However, at the moment on RQ v1.1 the @job decorator does not allow setting of a failure_ttl, so jobs - are kept inside the FailedJobRegistry for a year and there is no easy way tweak that. - This has already been fixed on rq/master (https://github.com/rq/rq/pull/1130) and will be available on the - next release. Until then, we simply don't reject keys in a FailedJobRegistry and purge any failed jobs - that have been idle for over settings.JOB_DEFAULT_FAILURE_TTL. - Once a new RQ release is available, we can delete this comment and the following line:""" - return True - - with Connection(rq_redis_connection): - failed_registries = [FailedJobRegistry(queue=q) for q in Queue.all()] - - job_id = lambda key: key.decode().split(':').pop() - return all([job_id(key) not in registry for registry in failed_registries]) - - stale_jobs = [key for key in jobs if is_idle(key) and has_failed(key) and not_in_any_failed_registry(key)] - - for key in stale_jobs: - rq_redis_connection.delete(key) + for job in stale_jobs: + job.delete() + rq_redis_connection.delete(job.key) logger.info('Purged %d old failed jobs.', len(stale_jobs))