Skip to content

Commit

Permalink
Python 3 RQ modifications (#4281)
Browse files Browse the repository at this point in the history
* show current worker job (alongside with minor cosmetic column tweaks)

* avoid loading entire job data for queued jobs

* track general RQ queues (default, periodic and schemas)

* get all active RQ queues

* call get_celery_queues in another place

* merge dicts the Python 3 way

* extend the result_ttl of refresh_queries to 600 seconds to allow it to continue running periodically even after longer executions
  • Loading branch information
Omer Lachish authored and arikfr committed Oct 24, 2019
1 parent 7f09da6 commit be32197
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 20 deletions.
8 changes: 5 additions & 3 deletions client/app/components/admin/RQStatus.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ const workersColumns = [Columns.custom(
/> {value}
</span>
), { title: 'State', dataIndex: 'state' },
)].concat(map(['Hostname', 'PID', 'Name', 'Queues', 'Successful Job Count',
'Failed Job Count', 'Birth Date', 'Total Working Time'],
c => ({ title: c, dataIndex: c.toLowerCase().replace(/\s/g, '_') })));
)].concat(map(['Hostname', 'PID', 'Name', 'Queues', 'Current Job', 'Successful Jobs', 'Failed Jobs'],
c => ({ title: c, dataIndex: c.toLowerCase().replace(/\s/g, '_') }))).concat([
Columns.dateTime({ title: 'Birth Date', dataIndex: 'birth_date' }),
Columns.duration({ title: 'Total Working Time', dataIndex: 'total_working_time' }),
]);

const queuesColumns = map(
['Name', 'Started', 'Queued'],
Expand Down
5 changes: 2 additions & 3 deletions client/app/pages/admin/Jobs.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ class Jobs extends React.Component {
}

processQueues = ({ queues, workers }) => {
const queueCounters = values(queues).map(({ name, started, queued }) => ({
name,
const queueCounters = values(queues).map(({ started, ...rest }) => ({
started: started.length,
queued: queued.length,
...rest,
}));

const overallCounters = queueCounters.reduce(
Expand Down
25 changes: 12 additions & 13 deletions redash/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def get_object_counts():
return status


def get_queues():
def get_celery_queues():
queue_names = db.session.query(DataSource.queue_name).distinct()
scheduled_queue_names = db.session.query(DataSource.scheduled_queue_name).distinct()
query = db.session.execute(union_all(queue_names, scheduled_queue_names))
Expand All @@ -35,14 +35,8 @@ def get_queues():


def get_queues_status():
queues = {}

for queue in get_queues():
queues[queue] = {
'size': redis_connection.llen(queue)
}

return queues
return {**{queue: {'size': redis_connection.llen(queue)} for queue in get_celery_queues()},
**{queue.name: {'size': len(queue)} for queue in Queue.all(connection=redis_connection)}}


def get_db_sizes():
Expand Down Expand Up @@ -134,7 +128,7 @@ def celery_tasks():
tasks = parse_tasks(celery.control.inspect().active(), 'active')
tasks += parse_tasks(celery.control.inspect().reserved(), 'reserved')

for queue_name in get_queues():
for queue_name in get_celery_queues():
tasks += get_waiting_in_queue(queue_name)

return tasks
Expand All @@ -155,10 +149,14 @@ def rq_queues():
q.name: {
'name': q.name,
'started': fetch_jobs(q, StartedJobRegistry(queue=q).get_job_ids()),
'queued': fetch_jobs(q, q.job_ids)
'queued': len(q.job_ids)
} for q in Queue.all(connection=redis_connection)}


def describe_job(job):
return '{} ({})'.format(job.id, job.func_name.split(".").pop()) if job else None


def rq_workers():
return [{
'name': w.name,
Expand All @@ -168,8 +166,9 @@ def rq_workers():
'state': w.state,
'last_heartbeat': w.last_heartbeat,
'birth_date': w.birth_date,
'successful_job_count': w.successful_job_count,
'failed_job_count': w.failed_job_count,
'current_job': describe_job(w.get_current_job()),
'successful_jobs': w.successful_job_count,
'failed_jobs': w.failed_job_count,
'total_working_time': w.total_working_time
} for w in Worker.all(connection=redis_connection)]

Expand Down
2 changes: 1 addition & 1 deletion redash/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def schedule_periodic_jobs():
job.delete()

jobs = [
{"func": refresh_queries, "interval": 30},
{"func": refresh_queries, "interval": 30, "result_ttl": 600},
{"func": empty_schedules, "interval": timedelta(minutes=60)},
{"func": refresh_schemas, "interval": timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)},
{"func": sync_user_details, "timeout": 60, "ttl": 45, "interval": timedelta(minutes=1)},
Expand Down

0 comments on commit be32197

Please sign in to comment.