-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcheck_celery.py
84 lines (66 loc) · 2.66 KB
/
check_celery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python2.7
"""Celery worker status checker"""
import argparse
import re
import subprocess
import sys
class NagiosPlugin(object): # pragma: no cover
OK = 0
CRITICAL = 2
UNKNOWN = 3
def run_check(self):
raise NotImplementedError
def ok_state(self, msg):
print "OK - {}".format(msg)
sys.exit(self.OK)
def critical_state(self, msg):
print "CRITICAL - {}".format(msg)
sys.exit(self.CRITICAL)
def unknown_state(self, msg):
print "UNKNOWN - {}".format(msg)
sys.exit(self.UNKNOWN)
class CeleryWorkerCheck(NagiosPlugin):
OK_STATUS_MSG = 'All workers running'
UNKNOWN_STATUS_MSG = 'Unable to get worker status(es)'
WORKER_REGEX_TPL = '{service} (\(node {worker}\) )?\(pid \d+\) is (up|running)'
CRITICAL_STATUS_MSG_TPL = '{} worker(s) down'
def __init__(self, workers, service):
self._workers = workers
self._service = service
self._check_cmd = ['service', service, 'status']
self._status_output = None
self._critical_workers = []
def run_check(self):
self._set_status_output()
self._check_status_output()
self._report_results()
def _set_status_output(self):
try:
self._status_output = subprocess.check_output(self._check_cmd)
except subprocess.CalledProcessError as process_exc:
if process_exc.returncode == 1:
self._status_output = process_exc.output
else:
self.unknown_state(self.UNKNOWN_STATUS_MSG)
def _check_status_output(self):
for worker in self._workers:
self._check_worker_status(worker)
def _check_worker_status(self, worker):
worker_regex = self.WORKER_REGEX_TPL.format(service=self._service, worker=worker)
worker_status_is_running = re.findall(worker_regex, self._status_output)
if not worker_status_is_running:
self._critical_workers.append(worker)
def _report_results(self):
if self._critical_workers:
workers_display = ', '.join(self._critical_workers)
self.critical_state(self.CRITICAL_STATUS_MSG_TPL.format(workers_display))
self.ok_state(self.OK_STATUS_MSG)
def main(): # pragma: no cover
parser = argparse.ArgumentParser(description='Celery worker status checker')
parser.add_argument('workers', nargs='+', help="Worker node names to check.")
parser.add_argument('--service', default='celeryd', help="Service script used to manage celery.")
args = parser.parse_args()
cwc = CeleryWorkerCheck(args.workers, args.service)
cwc.run_check()
if __name__ == '__main__': # pragma: no cover
main()