diff --git a/docker-compose/docker-compose.prod.yaml b/docker-compose/docker-compose.prod.yaml index 2972a47..cee53ca 100644 --- a/docker-compose/docker-compose.prod.yaml +++ b/docker-compose/docker-compose.prod.yaml @@ -83,6 +83,8 @@ services: condition: service_healthy prod_initialise: condition: service_completed_successfully + ports: + - "8004:8000" # We expose port 8000 from the container as 8004 to the world. build: context: ../ dockerfile: docker-compose/python.Dockerfile @@ -104,6 +106,8 @@ services: condition: service_healthy prod_initialise: condition: service_completed_successfully + ports: + - "8005:8000" # We expose port 8000 from the container as 8005 to the world. build: context: ../ dockerfile: docker-compose/python.Dockerfile diff --git a/duty_board/__main__.py b/duty_board/__main__.py index e9f966f..72ca9a3 100644 --- a/duty_board/__main__.py +++ b/duty_board/__main__.py @@ -7,8 +7,8 @@ import click from alembic.config import CommandLine from click import Context +from prometheus_client import start_http_server -from duty_board import worker_loop from duty_board.alchemy import update_duty_calendars from duty_board.alchemy.session import create_session from duty_board.plugin.abstract_plugin import AbstractPlugin @@ -48,16 +48,24 @@ def update_calendars() -> None: @cli.command() def calendar_refresher() -> None: + # Local imports so that only the relevant prometheus-client metrics are present. + from duty_board import worker_calendars + logger.info("Starting the worker to refresh the calendars.") plugin: AbstractPlugin = plugin_fetcher.get_plugin() - worker_loop.enter_calendar_refresher_loop(plugin) + start_http_server(port=8000) + worker_calendars.enter_calendar_refresher_loop(plugin) @cli.command() def duty_officer_refresher() -> None: + # Local imports so that only the relevant prometheus-client metrics are present. + from duty_board import worker_duty_officer + logger.info("Starting the worker to refresh the persons.") plugin: AbstractPlugin = plugin_fetcher.get_plugin() - worker_loop.enter_duty_officer_refresher_loop(plugin) + start_http_server(port=8000) + worker_duty_officer.enter_duty_officer_refresher_loop(plugin) @cli.command(name="webserver", context_settings={"ignore_unknown_options": True, "allow_extra_args": True}) diff --git a/duty_board/plugin/abstract_plugin.py b/duty_board/plugin/abstract_plugin.py index 979c99a..7cb7221 100644 --- a/duty_board/plugin/abstract_plugin.py +++ b/duty_board/plugin/abstract_plugin.py @@ -27,6 +27,7 @@ class AbstractPlugin(ABC): In case you want to know more check here.
Cheers! """ + interval_worker_metrics_update: ClassVar[datetime.timedelta] = datetime.timedelta(seconds=30) announcement_background_color_hex: ClassVar[str] = "#FF0000" announcement_text_color_hex: ClassVar[str] = "#FFFFFF" diff --git a/duty_board/server.py b/duty_board/server.py index 9fa3cb7..1bee479 100644 --- a/duty_board/server.py +++ b/duty_board/server.py @@ -6,6 +6,7 @@ import pytz from fastapi import FastAPI, HTTPException +from prometheus_client import Gauge from prometheus_fastapi_instrumentator import Instrumentator from pytz.exceptions import UnknownTimeZoneError from pytz.tzinfo import BaseTzInfo @@ -47,6 +48,7 @@ Path(os.environ["PROMETHEUS_MULTIPROC_DIR"]).mkdir(exist_ok=True) # Setup metrics collection for our FastAPI endpoints. Instrumentator().instrument(app).expose(app) +calendar_events_gauge = Gauge("duty_events_per_calendar", "Currently planned events per calendar", ["calendar_name"]) CURRENT_DIR: Final[Path] = Path(__file__).absolute().parent @@ -80,6 +82,11 @@ def _get_config_object(timezone_object: BaseTzInfo) -> _Config: ) +def collect_calendar_metrics(calendars: List[_Calendar]) -> None: + for calendar in calendars: + calendar_events_gauge.labels(calendar.name).set(len(calendar.events)) + + @app.get("/schedule", response_model=CurrentSchedule) async def get_schedule(timezone: str) -> CurrentSchedule: timezone_object = _parse_timezone_str(timezone) @@ -91,6 +98,7 @@ async def get_schedule(timezone: str) -> CurrentSchedule: all_encountered_person_uids=all_encountered_person_uids, timezone=timezone_object, ) + collect_calendar_metrics(calendars=calendars) persons: Dict[int, _PersonEssentials] = api_queries.get_peoples_essentials( session=session, all_person_uids=all_encountered_person_uids, diff --git a/duty_board/worker_calendars.py b/duty_board/worker_calendars.py new file mode 100644 index 0000000..4869ea2 --- /dev/null +++ b/duty_board/worker_calendars.py @@ -0,0 +1,105 @@ +import logging +import time +import traceback +from datetime import datetime, timezone +from typing import Optional, Tuple + +from pendulum.datetime import DateTime +from prometheus_client import Counter, Gauge +from sqlalchemy import Select, func, select +from sqlalchemy.orm import Session as SASession + +from duty_board.alchemy.session import create_session +from duty_board.models.calendar import Calendar +from duty_board.plugin.abstract_plugin import AbstractPlugin + +logger = logging.getLogger(__name__) + +calendars_gauge = Gauge("duty_calendar_amount", "Number of calendars listed in the database.") +calendars_outdated_gauge = Gauge("duty_calendars_outdated_amount", "Number of calendars that require an update.") +calendars_errors_gauge = Gauge("duty_calendars_errors_amount", "Number of calendars that have errors.") +calendar_refresh_run_counter = Counter("duty_calendars_refresh_counter", "Count the number of calendar refresh runs.") +calendar_refresh_run_success_counter = Counter( + "duty_calendars_refresh_success_counter", "Count the number of calendar refresh runs that succeed." +) +calendar_refresh_run_failed_counter = Counter( + "duty_calendars_refresh_failed_counter", "Count the number of calendar refresh runs that fail." +) +calendar_refresh_failed = Gauge( + "duty_calendars_last_refresh_failed", "Indicate whether the last refresh failed", ["calendar_name"] +) + + +def get_most_outdated_calendar(plugin: AbstractPlugin, session: SASession) -> Optional[Calendar]: + update_calendars_with_last_update_before: DateTime = DateTime.utcnow() - plugin.calendar_update_frequency + stmt: Select[Tuple[Calendar]] = ( + select(Calendar) + .where(Calendar.last_update_utc <= update_calendars_with_last_update_before) + .order_by(Calendar.last_update_utc) + .limit(1) + ) + return session.scalar(stmt) + + +def update_the_most_outdated_calendar(plugin: AbstractPlugin) -> None: + calendar_refresh_run_counter.inc() + failed: bool = False + try: + with create_session() as session: + calendar: Optional[Calendar] + if (calendar := get_most_outdated_calendar(plugin=plugin, session=session)) is None: + logger.debug("Nothing to update here :).") + time.sleep(1) # Avoid overload on the database. + return + + logger.info(f"Updating {calendar=}.") + try: + calendar = plugin.sync_calendar(calendar=calendar, session=session) + logger.debug(f"Successfully executed plugins sync_calendar() for {calendar=}.") + calendar.error_msg = None + except Exception: + failed = True + logger.exception(f"Failed to update {calendar=}.") + calendar.error_msg = traceback.format_exc() + finally: + calendar.last_update_utc = DateTime.utcnow() + session.merge(calendar) + logger.info("Successfully updated the state of the calendar in the database.") + except Exception: + failed = True + logger.exception("Failed to update some calendar in the database. There is probably some database error.") + + if calendar is not None: + calendar_refresh_failed.labels(calendar.name).set(int(failed)) + if failed: + calendar_refresh_run_failed_counter.inc() + else: + calendar_refresh_run_success_counter.inc() + + +def collect_extra_metrics_calendar(plugin: AbstractPlugin) -> None: + logger.info("Updating calender metrics.") + with create_session() as session: + number_of_calendars: int = session.scalar(select(func.count(Calendar.uid))) # type: ignore[assignment] + calendars_gauge.set(number_of_calendars) + + update_calendars_with_last_update_before: DateTime = DateTime.utcnow() - plugin.calendar_update_frequency + number_of_out_dated_calendars: int = session.scalar( # type: ignore[assignment] + select(func.count(Calendar.uid)).where(Calendar.last_update_utc <= update_calendars_with_last_update_before) + ) + calendars_outdated_gauge.set(number_of_out_dated_calendars) + + number_of_calendars_with_errors: int = session.scalar( # type: ignore[assignment] + select(func.count(Calendar.uid)).where(Calendar.error_msg != None) # noqa: E711 + ) + calendars_errors_gauge.set(number_of_calendars_with_errors) + + +def enter_calendar_refresher_loop(plugin: AbstractPlugin) -> None: + last_metrics_update = datetime.now(tz=timezone.utc) - plugin.interval_worker_metrics_update + while True: + if datetime.now(tz=timezone.utc) - last_metrics_update > plugin.interval_worker_metrics_update: + collect_extra_metrics_calendar(plugin=plugin) + last_metrics_update = datetime.now(tz=timezone.utc) + + update_the_most_outdated_calendar(plugin=plugin) diff --git a/duty_board/worker_loop.py b/duty_board/worker_duty_officer.py similarity index 61% rename from duty_board/worker_loop.py rename to duty_board/worker_duty_officer.py index 34e2c2a..183d469 100644 --- a/duty_board/worker_loop.py +++ b/duty_board/worker_duty_officer.py @@ -1,20 +1,35 @@ import logging import time import traceback +from datetime import datetime, timezone from typing import Optional, Sequence, Tuple from pendulum.datetime import DateTime -from sqlalchemy import Select, Update, or_, select, update +from prometheus_client import Counter, Gauge +from sqlalchemy import Select, Update, func, or_, select, update from sqlalchemy.orm import Session as SASession from duty_board.alchemy.session import create_session -from duty_board.models.calendar import Calendar from duty_board.models.on_call_event import OnCallEvent from duty_board.models.person import Person from duty_board.plugin.abstract_plugin import AbstractPlugin logger = logging.getLogger(__name__) +persons_gauge = Gauge("duty_officers_amount", "Number of persons tracked in the database.") +persons_outdated_gauge = Gauge("duty_officers_outdated_amount", "Number of persons that require an update.") +persons_errors_gauge = Gauge("duty_officers_errors_amount", "Number of calendars that have errors.") +persons_refresh_run_counter = Counter("duty_officers_refresh_counter", "Count the number of calendar refresh runs.") +persons_refresh_run_success_counter = Counter( + "duty_officers_refresh_success_counter", "Count the number of calendar refresh runs that succeed." +) +persons_refresh_run_failed_counter = Counter( + "duty_officers_refresh_failed_counter", "Count the number of calendar refresh runs that fail." +) +persons_refresh_failed = Gauge( + "duty_officers_last_refresh_failed", "Indicate whether the last refresh failed", ["person"] +) + def get_most_outdated_person(plugin: AbstractPlugin, session: SASession) -> Optional[Person]: update_persons_with_last_update_before: DateTime = DateTime.utcnow() - plugin.person_update_frequency @@ -27,17 +42,6 @@ def get_most_outdated_person(plugin: AbstractPlugin, session: SASession) -> Opti return session.scalar(stmt) -def get_most_outdated_calendar(plugin: AbstractPlugin, session: SASession) -> Optional[Calendar]: - update_calendars_with_last_update_before: DateTime = DateTime.utcnow() - plugin.calendar_update_frequency - stmt: Select[Tuple[Calendar]] = ( - select(Calendar) - .where(Calendar.last_update_utc <= update_calendars_with_last_update_before) - .order_by(Calendar.last_update_utc) - .limit(1) - ) - return session.scalar(stmt) - - def ensure_person_uniqueness(new_person: Person) -> Person: """ This function ensures we don't have 1000 users with the except same username and or email. @@ -75,6 +79,8 @@ def ensure_person_uniqueness(new_person: Person) -> Person: def update_the_most_outdated_person(plugin: AbstractPlugin) -> None: + persons_refresh_run_counter.inc() + failed: bool = False try: with create_session() as session: person: Optional[Person] @@ -90,6 +96,7 @@ def update_the_most_outdated_person(plugin: AbstractPlugin) -> None: person = ensure_person_uniqueness(new_person=person) person.error_msg = None except Exception: + failed = True logger.exception(f"Failed to update {person=}.") person.error_msg = traceback.format_exc() finally: @@ -97,39 +104,40 @@ def update_the_most_outdated_person(plugin: AbstractPlugin) -> None: session.merge(person) logger.info("Successfully updated the state of the person in the database.") except Exception: + failed = True logger.exception("Failed to update a person in the database. There is probably some database error.") + if person is not None: + persons_refresh_failed.labels(person.username).set(int(failed)) + if failed: + persons_refresh_run_failed_counter.inc() + else: + persons_refresh_run_success_counter.inc() -def update_the_most_outdated_calendar(plugin: AbstractPlugin) -> None: - try: - with create_session() as session: - calendar: Optional[Calendar] - if (calendar := get_most_outdated_calendar(plugin=plugin, session=session)) is None: - logger.debug("Nothing to update here :).") - time.sleep(1) # Avoid overload on the database. - return - logger.info(f"Updating {calendar=}.") - try: - calendar = plugin.sync_calendar(calendar=calendar, session=session) - logger.debug(f"Successfully executed plugins sync_calendar() for {calendar=}.") - calendar.error_msg = None - except Exception: - logger.exception(f"Failed to update {calendar=}.") - calendar.error_msg = traceback.format_exc() - finally: - calendar.last_update_utc = DateTime.utcnow() - session.merge(calendar) - logger.info("Successfully updated the state of the calendar in the database.") - except Exception: - logger.exception("Failed to update some calendar in the database. There is probably some database error.") +def collect_extra_metrics_duty_officer(plugin: AbstractPlugin) -> None: + logger.info("Updating duty officer metrics.") + with create_session() as session: + number_of_persons: int = session.scalar(select(func.count(Person.uid))) # type: ignore[assignment] + persons_gauge.set(number_of_persons) + update_persons_with_last_update_before: DateTime = DateTime.utcnow() - plugin.person_update_frequency + number_of_out_dated_persons: int = session.scalar( # type: ignore[assignment] + select(func.count(Person.uid)).where(Person.last_update_utc <= update_persons_with_last_update_before) + ) + persons_outdated_gauge.set(number_of_out_dated_persons) -def enter_calendar_refresher_loop(plugin: AbstractPlugin) -> None: - while True: - update_the_most_outdated_calendar(plugin=plugin) + number_of_persons_with_errors: int = session.scalar( # type: ignore[assignment] + select(func.count(Person.uid)).where(Person.error_msg != None) # noqa: E711 + ) + persons_errors_gauge.set(number_of_persons_with_errors) def enter_duty_officer_refresher_loop(plugin: AbstractPlugin) -> None: + last_metrics_update: datetime = datetime.now(tz=timezone.utc) - plugin.interval_worker_metrics_update while True: + if datetime.now(tz=timezone.utc) - last_metrics_update > plugin.interval_worker_metrics_update: + collect_extra_metrics_duty_officer(plugin=plugin) + last_metrics_update = datetime.now(tz=timezone.utc) + update_the_most_outdated_person(plugin=plugin) diff --git a/tests/test_worker_calendars.py b/tests/test_worker_calendars.py new file mode 100644 index 0000000..010a294 --- /dev/null +++ b/tests/test_worker_calendars.py @@ -0,0 +1,63 @@ +from datetime import datetime, timedelta +from typing import List, Optional, Tuple + +import pytest +import requests_mock +from pendulum.tz.timezone import UTC +from sqlalchemy import select +from sqlalchemy.orm.session import Session as SASession + +from duty_board import worker_calendars +from duty_board.alchemy.session import create_session +from duty_board.models.calendar import Calendar +from duty_board.models.on_call_event import OnCallEvent +from tests import ical_helper +from tests.conftest import get_loaded_ldap_plugin + + +@pytest.mark.usefixtures("_wipe_database") +def test_update_the_most_outdated_calendar() -> None: + # First run is when there is nothing to update yet :) + with get_loaded_ldap_plugin() as example_plugin: + worker_calendars.update_the_most_outdated_calendar(example_plugin) + + # Create the calendar + session: SASession + with create_session() as session: + calendar = Calendar( + uid="data_platform_duty", + name="Data Platform Duty", + description="Call us for your issues with the data platform", + icalendar_url="https://non-existing-url.com/icalendar.ics", + category="Big Data", + order=1, + last_update_utc=datetime(1970, 1, 1, 0, 0, 2, tzinfo=UTC), + sync=False, + ) + session.add(calendar) + + # Do the actual update loop + with get_loaded_ldap_plugin() as example_plugin, requests_mock.Mocker() as m: + events: List[Tuple[Optional[str], Optional[str], timedelta, timedelta]] = [ + ("jan", None, timedelta(days=0), timedelta(days=1)), + ("jan", "mailto:jan@schoenmaker.nl", timedelta(days=1), timedelta(days=1)), + ("henkietankie", "henk@tank.nl", timedelta(days=2), timedelta(days=20)), + ] + ical_response = ical_helper.get_icalendar_response(events=events) + m.get("https://non-existing-url.com/icalendar.ics", text=ical_response) + + worker_calendars.update_the_most_outdated_calendar(example_plugin) + + with create_session() as session: + on_call_events = list(session.scalars(select(OnCallEvent).order_by(OnCallEvent.start_event_utc)).all()) + assert len(on_call_events) == 3 + for on_call_event in on_call_events: + assert on_call_event.calendar_uid == "data_platform_duty" + assert on_call_events[0].person.username == "jan" + assert on_call_events[0].person.email is None + + assert on_call_events[1].person.username is None + assert on_call_events[1].person.email == "jan@schoenmaker.nl" + + assert on_call_events[2].person.username is None + assert on_call_events[2].person.email == "henk@tank.nl" diff --git a/tests/test_worker_loop.py b/tests/test_worker_duty_officer.py similarity index 58% rename from tests/test_worker_loop.py rename to tests/test_worker_duty_officer.py index 25f2e21..bd8874e 100644 --- a/tests/test_worker_loop.py +++ b/tests/test_worker_duty_officer.py @@ -1,20 +1,18 @@ from datetime import datetime, timedelta -from typing import List, Optional, Tuple +from typing import List import pytest -import requests_mock from pendulum.datetime import DateTime from pendulum.tz.timezone import UTC from sqlalchemy import select from sqlalchemy.orm.session import Session as SASession -from duty_board import worker_loop +from duty_board import worker_duty_officer from duty_board.alchemy import update_duty_calendars from duty_board.alchemy.session import create_session from duty_board.models.calendar import Calendar from duty_board.models.on_call_event import OnCallEvent from duty_board.models.person import Person -from tests import ical_helper from tests.conftest import get_loaded_ldap_plugin @@ -28,7 +26,7 @@ def test_update_the_most_outdated_person() -> None: session=session, duty_calendar_configurations=example_plugin.duty_calendar_configurations ) # First run is when there is nothing to update yet :) - worker_loop.update_the_most_outdated_person(example_plugin) + worker_duty_officer.update_the_most_outdated_person(example_plugin) # Ingest the Persons & OnCallEvents with create_session() as session: @@ -80,9 +78,9 @@ def test_update_the_most_outdated_person() -> None: # We proceed to do our three runs to update all three persons with get_loaded_ldap_plugin() as example_plugin: - worker_loop.update_the_most_outdated_person(example_plugin) - worker_loop.update_the_most_outdated_person(example_plugin) - worker_loop.update_the_most_outdated_person(example_plugin) + worker_duty_officer.update_the_most_outdated_person(example_plugin) + worker_duty_officer.update_the_most_outdated_person(example_plugin) + worker_duty_officer.update_the_most_outdated_person(example_plugin) # Here we verify that the users were filled in and user 1 and 2 were combined. with create_session() as session: @@ -93,51 +91,3 @@ def test_update_the_most_outdated_person() -> None: on_call_events = list(session.scalars(select(OnCallEvent).order_by(OnCallEvent.start_event_utc)).all()) assert on_call_events[1].person_uid == on_call_events[2].person_uid - - -@pytest.mark.usefixtures("_wipe_database") -def test_update_the_most_outdated_calendar() -> None: - # First run is when there is nothing to update yet :) - with get_loaded_ldap_plugin() as example_plugin: - worker_loop.update_the_most_outdated_calendar(example_plugin) - - # Create the calendar - session: SASession - with create_session() as session: - calendar = Calendar( - uid="data_platform_duty", - name="Data Platform Duty", - description="Call us for your issues with the data platform", - icalendar_url="https://non-existing-url.com/icalendar.ics", - category="Big Data", - order=1, - last_update_utc=datetime(1970, 1, 1, 0, 0, 2, tzinfo=UTC), - sync=False, - ) - session.add(calendar) - - # Do the actual update loop - with get_loaded_ldap_plugin() as example_plugin, requests_mock.Mocker() as m: - events: List[Tuple[Optional[str], Optional[str], timedelta, timedelta]] = [ - ("jan", None, timedelta(days=0), timedelta(days=1)), - ("jan", "mailto:jan@schoenmaker.nl", timedelta(days=1), timedelta(days=1)), - ("henkietankie", "henk@tank.nl", timedelta(days=2), timedelta(days=20)), - ] - ical_response = ical_helper.get_icalendar_response(events=events) - m.get("https://non-existing-url.com/icalendar.ics", text=ical_response) - - worker_loop.update_the_most_outdated_calendar(example_plugin) - - with create_session() as session: - on_call_events = list(session.scalars(select(OnCallEvent).order_by(OnCallEvent.start_event_utc)).all()) - assert len(on_call_events) == 3 - for on_call_event in on_call_events: - assert on_call_event.calendar_uid == "data_platform_duty" - assert on_call_events[0].person.username == "jan" - assert on_call_events[0].person.email is None - - assert on_call_events[1].person.username is None - assert on_call_events[1].person.email == "jan@schoenmaker.nl" - - assert on_call_events[2].person.username is None - assert on_call_events[2].person.email == "henk@tank.nl"