From df9a5fa217553398418840f1325f4d6600e8751b Mon Sep 17 00:00:00 2001 From: Elia Soldati Date: Fri, 6 Dec 2024 14:39:10 +0100 Subject: [PATCH] Prometheus Exporter (#1303) Add a new command: cmsPrometheusExporter This will start a new HTTP server that listens on a port (defaults to 8811) for requests to /metrics. At this endpoint many metrics about a contest (or all contests) are exposed for Prometheus to be collected. Those metrics consider both system values (queue status, connected workers, ...) and contest values (submissions, users, questions, ...) This exporter works by executing simple queries to the database, and obtains the queue information asking Evaluation Service. The metrics exposed may leak information about the contest, so it's important to secure this endpoint (for example using a reverse proxy, or binding localhost). With this exporter it's possible to build interactive and real-time dashboard for monitoring the status of the contest. --------- Co-authored-by: Edoardo Morassutto --- cmscontrib/PrometheusExporter.py | 350 +++++++++++++++++++++++++++++++ config/cms.conf.sample | 10 +- requirements.txt | 3 + setup.py | 1 + 4 files changed, 363 insertions(+), 1 deletion(-) create mode 100644 cmscontrib/PrometheusExporter.py diff --git a/cmscontrib/PrometheusExporter.py b/cmscontrib/PrometheusExporter.py new file mode 100644 index 0000000000..5a69b22f0d --- /dev/null +++ b/cmscontrib/PrometheusExporter.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python3 + +# Contest Management System - http://cms-dev.github.io/ +# Copyright © 2020 Edoardo Morassutto +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +# We enable monkey patching to make many libraries gevent-friendly +# (for instance, urllib3, used by requests) +import gevent.monkey + +from cms.db.contest import Contest + +gevent.monkey.patch_all() # noqa + +import argparse +import logging + +from prometheus_client import start_http_server +from prometheus_client.core import REGISTRY, CounterMetricFamily, GaugeMetricFamily +from sqlalchemy import func, distinct + +from cms import ServiceCoord +from cms import config +from cms.db import ( + Announcement, + Dataset, + Message, + Participation, + Question, + SessionGen, + Submission, + SubmissionResult, + Task, +) +from cms.io.service import Service +from cms.io.rpc import RPCError +from cms.server.admin.server import AdminWebServer + +logger = logging.getLogger(__name__) + +class PrometheusExporter(Service): + def __init__(self, args): + super().__init__() + + self.host = args.host + self.port = args.port + + self.export_submissions = not args.no_submissions + self.export_workers = not args.no_workers + self.export_queue = not args.no_queue + self.export_communiactions = not args.no_communications + self.export_users = not args.no_users + self.evaluation_service = None + + def run(self): + REGISTRY.register(self) + start_http_server(self.port, addr=self.host) + logger.info("Started at http://%s:%s/metric", self.host, self.port) + super().run() + + def collect(self): + with SessionGen() as session: + if self.export_submissions: + yield from self._collect_submissions(session) + if self.export_communiactions: + yield from self._collect_communications(session) + if self.export_users: + yield from self._collect_users(session) + + if self.evaluation_service is None: + try: + self.evaluation_service = self.connect_to(ServiceCoord("EvaluationService", 0)) + except RPCError: + pass + if self.evaluation_service is not None: + try: + if self.export_workers: + yield from self._collect_workers() + if self.export_queue: + yield from self._collect_queue() + except RPCError: + self.evaluation_service = None + + metric = GaugeMetricFamily("cms_es_is_up", "Whether the Evaluation Service is currently up") + metric.add_metric([], 1 if self.evaluation_service is not None else 0) + yield metric + + def _collect_submissions(self, session): + # compiling / max_compilations / compilation_fail / evaluating / + # max_evaluations / scoring / scored / total + stats = AdminWebServer.submissions_status(None) + metric = GaugeMetricFamily( + "cms_submissions", + "Number of submissions per category", + labels=["status"], + ) + for status, count in stats.items(): + metric.add_metric([status], count) + yield metric + + metric = CounterMetricFamily( + "cms_task_submissions", + "Number of submissions per task", + labels=["task"], + ) + data = ( + session.query(Task.name, func.count(SubmissionResult.submission_id)) + .select_from(SubmissionResult) + .join(Dataset) + .join(Task, Dataset.task_id == Task.id) + .filter(Task.active_dataset_id == SubmissionResult.dataset_id) + .group_by(Task.name) + .all() + ) + for task_name, count in data: + metric.add_metric([task_name], count) + yield metric + + metric = CounterMetricFamily( + "cms_submissions_language", + "Number of submissions per language", + labels=["language"], + ) + data = ( + session.query(Submission.language, func.count(Submission.id)) + .select_from(Submission) + .group_by(Submission.language) + .all() + ) + for language, count in data: + metric.add_metric([language], count) + yield metric + + def _collect_workers(self): + status = self.evaluation_service.workers_status().get() + metric = GaugeMetricFamily( + "cms_workers", + "Number of cmsWorker instances", + labels=["status"], + ) + metric.add_metric(["total"], len(status)) + metric.add_metric( + ["connected"], sum(1 for worker in status.values() if worker["connected"]) + ) + metric.add_metric( + ["working"], sum(1 for worker in status.values() if worker["operations"]) + ) + yield metric + + def _collect_queue(self): + status = self.evaluation_service.queue_status().get() + + metric = GaugeMetricFamily("cms_queue_length", "Number of entries in the queue") + metric.add_metric([], len(status)) + yield metric + + metric = GaugeMetricFamily( + "cms_queue_item_types", + "Types of items in the queue", + labels=["type"], + ) + types = {} + for item in status: + typ = item["item"]["type"] + types.setdefault(typ, 0) + types[typ] += 1 + for typ, count in types.items(): + metric.add_metric([typ], count) + yield metric + + metric = GaugeMetricFamily( + "cms_queue_oldest_job", + "Timestamp of the oldest job in the queue", + ) + if status: + oldest = min(status, key=lambda x: x["timestamp"]) + metric.add_metric([], oldest["timestamp"]) + yield metric + + def _collect_communications(self, session): + metric = CounterMetricFamily( + "cms_questions", + "Number of questions", + labels=["status"], + ) + data = session.query(func.count(Question.id)).select_from(Question).all() + metric.add_metric(["total"], data[0][0]) + data = ( + session.query(func.count(Question.id)) + .select_from(Question) + .filter(Question.ignored == True) + .all() + ) + metric.add_metric(["ignored"], data[0][0]) + data = ( + session.query(func.count(Question.id)) + .select_from(Question) + .filter(Question.reply_timestamp != None) + .all() + ) + metric.add_metric(["answered"], data[0][0]) + yield metric + + metric = CounterMetricFamily("cms_messages", "Number of private messages") + data = session.query(func.count(Message.id)).select_from(Message).all() + metric.add_metric([], data[0][0]) + yield metric + + metric = CounterMetricFamily("cms_announcements", "Number of announcements") + data = ( + session.query(func.count(Announcement.id)).select_from(Announcement).all() + ) + metric.add_metric([], data[0][0]) + yield metric + + def _collect_users(self, session): + metric = GaugeMetricFamily( + "cms_participations", + "Number of participations grouped by category and contest", + labels=["category", "contest"], + ) + data = ( + session.query(Participation.contest_id, func.count(Participation.id)) + .select_from(Participation) + .group_by(Participation.contest_id) + .all() + ) + for contest_id, count in data: + metric.add_metric(["total", str(contest_id)], count) + data = ( + session.query(Participation.contest_id, func.count(Participation.id)) + .select_from(Participation) + .filter(Participation.hidden == True) + .group_by(Participation.contest_id) + .all() + ) + for contest_id, count in data: + metric.add_metric(["hidden", str(contest_id)], count) + data = ( + session.query(Participation.contest_id, func.count(Participation.id)) + .select_from(Participation) + .filter(Participation.unrestricted == True) + .group_by(Participation.contest_id) + .all() + ) + for contest_id, count in data: + metric.add_metric(["unrestricted", str(contest_id)], count) + data = ( + session.query(Participation.contest_id, func.count(Participation.id)) + .select_from(Participation) + .filter(Participation.starting_time != None) + .group_by(Participation.contest_id) + .all() + ) + for contest_id, count in data: + metric.add_metric(["started", str(contest_id)], count) + data = ( + session.query( + Participation.contest_id, func.count(distinct(Participation.id)) + ) + .select_from(Participation) + .join(Submission) + .group_by(Participation.contest_id) + .all() + ) + for contest_id, count in data: + metric.add_metric(["submitted", str(contest_id)], count) + data = ( + session.query( + Participation.contest_id, func.count(distinct(Participation.id)) + ) + .select_from(Participation) + .join(Submission) + .join(SubmissionResult) + .join(Dataset) + .join(Task, Dataset.task_id == Task.id) + .filter(Task.active_dataset_id == SubmissionResult.dataset_id) + .filter(SubmissionResult.score > 0) + .group_by(Participation.contest_id) + .all() + ) + for contest_id, count in data: + metric.add_metric(["non_zero", str(contest_id)], count) + yield metric + + +def main(): + """Parse arguments and launch process.""" + parser = argparse.ArgumentParser(description="Prometheus exporter.") + parser.add_argument( + "--host", + help="IP address to bind to", + default=config.prometheus_listen_address, + ) + parser.add_argument( + "--port", + help="Port to use", + default=config.prometheus_listen_port, + type=int, + ) + parser.add_argument( + "--no-submissions", + help="Do not export submissions metrics", + action="store_true", + ) + parser.add_argument( + "--no-workers", + help="Do not export workers metrics", + action="store_true", + ) + parser.add_argument( + "--no-queue", + help="Do not export queue metrics", + action="store_true", + ) + parser.add_argument( + "--no-communications", + help="Do not export communications metrics", + action="store_true", + ) + parser.add_argument( + "--no-users", + help="Do not export users metrics", + action="store_true", + ) + + # unsed, but passed by ResourceService + parser.add_argument("shard", default="", help="unused") + parser.add_argument("-c", "--contest", default="", help="unused") + + args = parser.parse_args() + + service = PrometheusExporter(args) + service.run() + + +if __name__ == "__main__": + main() diff --git a/config/cms.conf.sample b/config/cms.conf.sample index eb486de6f5..be43efc5af 100644 --- a/config/cms.conf.sample +++ b/config/cms.conf.sample @@ -43,7 +43,8 @@ "ContestWebServer": [["localhost", 21000]], "AdminWebServer": [["localhost", 21100]], "ProxyService": [["localhost", 28600]], - "PrintingService": [["localhost", 25123]] + "PrintingService": [["localhost", 25123]], + "PrometheusExporter": [] }, "other_services": @@ -189,6 +190,13 @@ "pdf_printing_allowed": false, + "_section": "PrometheusExporter", + + "_help": "Listening HTTP address and port for the exporter. If exposed", + "_help": "this may leak private information, make sure to secure this endpoint.", + "prometheus_listen_address": "127.0.0.1", + "prometheus_listen_port": 8811, + "_help": "This is the end of this file." } diff --git a/requirements.txt b/requirements.txt index 21d340b9ba..1040c86341 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,6 @@ pyyaml>=5.3,<6.1 # http://pyyaml.org/wiki/PyYAML # Only for printing: pycups==2.0.4 # https://pypi.python.org/pypi/pycups PyPDF2>=1.26,<3.1 # https://github.com/mstamy2/PyPDF2/blob/master/CHANGELOG + +# Only for cmsPrometheusExporter +prometheus-client>=0.7,<0.8 # https://pypi.org/project/prometheus-client diff --git a/setup.py b/setup.py index a6e036f1dd..5bc819d731 100755 --- a/setup.py +++ b/setup.py @@ -168,6 +168,7 @@ def run(self): "cmsRemoveUser=cmscontrib.RemoveUser:main", "cmsSpoolExporter=cmscontrib.SpoolExporter:main", "cmsMake=cmstaskenv.cmsMake:main", + "cmsPrometheusExporter=cmscontrib.PrometheusExporter:main", ], "cms.grading.tasktypes": [ "Batch=cms.grading.tasktypes.Batch:Batch",