Skip to content

Commit

Permalink
✨Add metrics collection to DutyBoard
Browse files Browse the repository at this point in the history
  • Loading branch information
jorrick authored and Jorricks committed Mar 22, 2024
1 parent 09f7e36 commit 098581d
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 97 deletions.
4 changes: 4 additions & 0 deletions docker-compose/docker-compose.prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ services:
condition: service_healthy
prod_initialise:
condition: service_completed_successfully
ports:
- "8004:8000" # We expose port 8000 from the container as 8004 to the world.
build:
context: ../
dockerfile: docker-compose/python.Dockerfile
Expand All @@ -104,6 +106,8 @@ services:
condition: service_healthy
prod_initialise:
condition: service_completed_successfully
ports:
- "8005:8000" # We expose port 8000 from the container as 8005 to the world.
build:
context: ../
dockerfile: docker-compose/python.Dockerfile
Expand Down
14 changes: 11 additions & 3 deletions duty_board/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import click
from alembic.config import CommandLine
from click import Context
from prometheus_client import start_http_server

from duty_board import worker_loop
from duty_board.alchemy import update_duty_calendars
from duty_board.alchemy.session import create_session
from duty_board.plugin.abstract_plugin import AbstractPlugin
Expand Down Expand Up @@ -48,16 +48,24 @@ def update_calendars() -> None:

@cli.command()
def calendar_refresher() -> None:
# Local imports so that only the relevant prometheus-client metrics are present.
from duty_board import worker_calendars

logger.info("Starting the worker to refresh the calendars.")
plugin: AbstractPlugin = plugin_fetcher.get_plugin()
worker_loop.enter_calendar_refresher_loop(plugin)
start_http_server(port=8000)
worker_calendars.enter_calendar_refresher_loop(plugin)


@cli.command()
def duty_officer_refresher() -> None:
# Local imports so that only the relevant prometheus-client metrics are present.
from duty_board import worker_duty_officer

logger.info("Starting the worker to refresh the persons.")
plugin: AbstractPlugin = plugin_fetcher.get_plugin()
worker_loop.enter_duty_officer_refresher_loop(plugin)
start_http_server(port=8000)
worker_duty_officer.enter_duty_officer_refresher_loop(plugin)


@cli.command(name="webserver", context_settings={"ignore_unknown_options": True, "allow_extra_args": True})
Expand Down
1 change: 1 addition & 0 deletions duty_board/plugin/abstract_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class AbstractPlugin(ABC):
In case you want to know more check <a href="https://github.com/Jorricks/DutyBoard">here</a>.<br>
Cheers!
"""
interval_worker_metrics_update: ClassVar[datetime.timedelta] = datetime.timedelta(seconds=30)

announcement_background_color_hex: ClassVar[str] = "#FF0000"
announcement_text_color_hex: ClassVar[str] = "#FFFFFF"
Expand Down
8 changes: 8 additions & 0 deletions duty_board/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pytz
from fastapi import FastAPI, HTTPException
from prometheus_client import Gauge
from prometheus_fastapi_instrumentator import Instrumentator
from pytz.exceptions import UnknownTimeZoneError
from pytz.tzinfo import BaseTzInfo
Expand Down Expand Up @@ -47,6 +48,7 @@
Path(os.environ["PROMETHEUS_MULTIPROC_DIR"]).mkdir(exist_ok=True)
# Setup metrics collection for our FastAPI endpoints.
Instrumentator().instrument(app).expose(app)
calendar_events_gauge = Gauge("duty_events_per_calendar", "Currently planned events per calendar", ["calendar_name"])


CURRENT_DIR: Final[Path] = Path(__file__).absolute().parent
Expand Down Expand Up @@ -80,6 +82,11 @@ def _get_config_object(timezone_object: BaseTzInfo) -> _Config:
)


def collect_calendar_metrics(calendars: List[_Calendar]) -> None:
for calendar in calendars:
calendar_events_gauge.labels(calendar.name).set(len(calendar.events))


@app.get("/schedule", response_model=CurrentSchedule)
async def get_schedule(timezone: str) -> CurrentSchedule:
timezone_object = _parse_timezone_str(timezone)
Expand All @@ -91,6 +98,7 @@ async def get_schedule(timezone: str) -> CurrentSchedule:
all_encountered_person_uids=all_encountered_person_uids,
timezone=timezone_object,
)
collect_calendar_metrics(calendars=calendars)
persons: Dict[int, _PersonEssentials] = api_queries.get_peoples_essentials(
session=session,
all_person_uids=all_encountered_person_uids,
Expand Down
105 changes: 105 additions & 0 deletions duty_board/worker_calendars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import logging
import time
import traceback
from datetime import datetime, timezone
from typing import Optional, Tuple

from pendulum.datetime import DateTime
from prometheus_client import Counter, Gauge
from sqlalchemy import Select, func, select
from sqlalchemy.orm import Session as SASession

from duty_board.alchemy.session import create_session
from duty_board.models.calendar import Calendar
from duty_board.plugin.abstract_plugin import AbstractPlugin

logger = logging.getLogger(__name__)

calendars_gauge = Gauge("duty_calendar_amount", "Number of calendars listed in the database.")
calendars_outdated_gauge = Gauge("duty_calendars_outdated_amount", "Number of calendars that require an update.")
calendars_errors_gauge = Gauge("duty_calendars_errors_amount", "Number of calendars that have errors.")
calendar_refresh_run_counter = Counter("duty_calendars_refresh_counter", "Count the number of calendar refresh runs.")
calendar_refresh_run_success_counter = Counter(
"duty_calendars_refresh_success_counter", "Count the number of calendar refresh runs that succeed."
)
calendar_refresh_run_failed_counter = Counter(
"duty_calendars_refresh_failed_counter", "Count the number of calendar refresh runs that fail."
)
calendar_refresh_failed = Gauge(
"duty_calendars_last_refresh_failed", "Indicate whether the last refresh failed", ["calendar_name"]
)


def get_most_outdated_calendar(plugin: AbstractPlugin, session: SASession) -> Optional[Calendar]:
update_calendars_with_last_update_before: DateTime = DateTime.utcnow() - plugin.calendar_update_frequency
stmt: Select[Tuple[Calendar]] = (
select(Calendar)
.where(Calendar.last_update_utc <= update_calendars_with_last_update_before)
.order_by(Calendar.last_update_utc)
.limit(1)
)
return session.scalar(stmt)


def update_the_most_outdated_calendar(plugin: AbstractPlugin) -> None:
calendar_refresh_run_counter.inc()
failed: bool = False
try:
with create_session() as session:
calendar: Optional[Calendar]
if (calendar := get_most_outdated_calendar(plugin=plugin, session=session)) is None:
logger.debug("Nothing to update here :).")
time.sleep(1) # Avoid overload on the database.
return

logger.info(f"Updating {calendar=}.")
try:
calendar = plugin.sync_calendar(calendar=calendar, session=session)
logger.debug(f"Successfully executed plugins sync_calendar() for {calendar=}.")
calendar.error_msg = None
except Exception:
failed = True
logger.exception(f"Failed to update {calendar=}.")
calendar.error_msg = traceback.format_exc()
finally:
calendar.last_update_utc = DateTime.utcnow()
session.merge(calendar)
logger.info("Successfully updated the state of the calendar in the database.")
except Exception:
failed = True
logger.exception("Failed to update some calendar in the database. There is probably some database error.")

if calendar is not None:
calendar_refresh_failed.labels(calendar.name).set(int(failed))
if failed:
calendar_refresh_run_failed_counter.inc()
else:
calendar_refresh_run_success_counter.inc()


def collect_extra_metrics_calendar(plugin: AbstractPlugin) -> None:
logger.info("Updating calender metrics.")
with create_session() as session:
number_of_calendars: int = session.scalar(select(func.count(Calendar.uid))) # type: ignore[assignment]
calendars_gauge.set(number_of_calendars)

update_calendars_with_last_update_before: DateTime = DateTime.utcnow() - plugin.calendar_update_frequency
number_of_out_dated_calendars: int = session.scalar( # type: ignore[assignment]
select(func.count(Calendar.uid)).where(Calendar.last_update_utc <= update_calendars_with_last_update_before)
)
calendars_outdated_gauge.set(number_of_out_dated_calendars)

number_of_calendars_with_errors: int = session.scalar( # type: ignore[assignment]
select(func.count(Calendar.uid)).where(Calendar.error_msg != None) # noqa: E711
)
calendars_errors_gauge.set(number_of_calendars_with_errors)


def enter_calendar_refresher_loop(plugin: AbstractPlugin) -> None:
last_metrics_update = datetime.now(tz=timezone.utc) - plugin.interval_worker_metrics_update
while True:
if datetime.now(tz=timezone.utc) - last_metrics_update > plugin.interval_worker_metrics_update:
collect_extra_metrics_calendar(plugin=plugin)
last_metrics_update = datetime.now(tz=timezone.utc)

update_the_most_outdated_calendar(plugin=plugin)
84 changes: 46 additions & 38 deletions duty_board/worker_loop.py → duty_board/worker_duty_officer.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,35 @@
import logging
import time
import traceback
from datetime import datetime, timezone
from typing import Optional, Sequence, Tuple

from pendulum.datetime import DateTime
from sqlalchemy import Select, Update, or_, select, update
from prometheus_client import Counter, Gauge
from sqlalchemy import Select, Update, func, or_, select, update
from sqlalchemy.orm import Session as SASession

from duty_board.alchemy.session import create_session
from duty_board.models.calendar import Calendar
from duty_board.models.on_call_event import OnCallEvent
from duty_board.models.person import Person
from duty_board.plugin.abstract_plugin import AbstractPlugin

logger = logging.getLogger(__name__)

persons_gauge = Gauge("duty_officers_amount", "Number of persons tracked in the database.")
persons_outdated_gauge = Gauge("duty_officers_outdated_amount", "Number of persons that require an update.")
persons_errors_gauge = Gauge("duty_officers_errors_amount", "Number of calendars that have errors.")
persons_refresh_run_counter = Counter("duty_officers_refresh_counter", "Count the number of calendar refresh runs.")
persons_refresh_run_success_counter = Counter(
"duty_officers_refresh_success_counter", "Count the number of calendar refresh runs that succeed."
)
persons_refresh_run_failed_counter = Counter(
"duty_officers_refresh_failed_counter", "Count the number of calendar refresh runs that fail."
)
persons_refresh_failed = Gauge(
"duty_officers_last_refresh_failed", "Indicate whether the last refresh failed", ["person"]
)


def get_most_outdated_person(plugin: AbstractPlugin, session: SASession) -> Optional[Person]:
update_persons_with_last_update_before: DateTime = DateTime.utcnow() - plugin.person_update_frequency
Expand All @@ -27,17 +42,6 @@ def get_most_outdated_person(plugin: AbstractPlugin, session: SASession) -> Opti
return session.scalar(stmt)


def get_most_outdated_calendar(plugin: AbstractPlugin, session: SASession) -> Optional[Calendar]:
update_calendars_with_last_update_before: DateTime = DateTime.utcnow() - plugin.calendar_update_frequency
stmt: Select[Tuple[Calendar]] = (
select(Calendar)
.where(Calendar.last_update_utc <= update_calendars_with_last_update_before)
.order_by(Calendar.last_update_utc)
.limit(1)
)
return session.scalar(stmt)


def ensure_person_uniqueness(new_person: Person) -> Person:
"""
This function ensures we don't have 1000 users with the except same username and or email.
Expand Down Expand Up @@ -75,6 +79,8 @@ def ensure_person_uniqueness(new_person: Person) -> Person:


def update_the_most_outdated_person(plugin: AbstractPlugin) -> None:
persons_refresh_run_counter.inc()
failed: bool = False
try:
with create_session() as session:
person: Optional[Person]
Expand All @@ -90,46 +96,48 @@ def update_the_most_outdated_person(plugin: AbstractPlugin) -> None:
person = ensure_person_uniqueness(new_person=person)
person.error_msg = None
except Exception:
failed = True
logger.exception(f"Failed to update {person=}.")
person.error_msg = traceback.format_exc()
finally:
person.last_update_utc = DateTime.utcnow()
session.merge(person)
logger.info("Successfully updated the state of the person in the database.")
except Exception:
failed = True
logger.exception("Failed to update a person in the database. There is probably some database error.")

if person is not None:
persons_refresh_failed.labels(person.username).set(int(failed))
if failed:
persons_refresh_run_failed_counter.inc()
else:
persons_refresh_run_success_counter.inc()

def update_the_most_outdated_calendar(plugin: AbstractPlugin) -> None:
try:
with create_session() as session:
calendar: Optional[Calendar]
if (calendar := get_most_outdated_calendar(plugin=plugin, session=session)) is None:
logger.debug("Nothing to update here :).")
time.sleep(1) # Avoid overload on the database.
return

logger.info(f"Updating {calendar=}.")
try:
calendar = plugin.sync_calendar(calendar=calendar, session=session)
logger.debug(f"Successfully executed plugins sync_calendar() for {calendar=}.")
calendar.error_msg = None
except Exception:
logger.exception(f"Failed to update {calendar=}.")
calendar.error_msg = traceback.format_exc()
finally:
calendar.last_update_utc = DateTime.utcnow()
session.merge(calendar)
logger.info("Successfully updated the state of the calendar in the database.")
except Exception:
logger.exception("Failed to update some calendar in the database. There is probably some database error.")
def collect_extra_metrics_duty_officer(plugin: AbstractPlugin) -> None:
logger.info("Updating duty officer metrics.")
with create_session() as session:
number_of_persons: int = session.scalar(select(func.count(Person.uid))) # type: ignore[assignment]
persons_gauge.set(number_of_persons)

update_persons_with_last_update_before: DateTime = DateTime.utcnow() - plugin.person_update_frequency
number_of_out_dated_persons: int = session.scalar( # type: ignore[assignment]
select(func.count(Person.uid)).where(Person.last_update_utc <= update_persons_with_last_update_before)
)
persons_outdated_gauge.set(number_of_out_dated_persons)

def enter_calendar_refresher_loop(plugin: AbstractPlugin) -> None:
while True:
update_the_most_outdated_calendar(plugin=plugin)
number_of_persons_with_errors: int = session.scalar( # type: ignore[assignment]
select(func.count(Person.uid)).where(Person.error_msg != None) # noqa: E711
)
persons_errors_gauge.set(number_of_persons_with_errors)


def enter_duty_officer_refresher_loop(plugin: AbstractPlugin) -> None:
last_metrics_update: datetime = datetime.now(tz=timezone.utc) - plugin.interval_worker_metrics_update
while True:
if datetime.now(tz=timezone.utc) - last_metrics_update > plugin.interval_worker_metrics_update:
collect_extra_metrics_duty_officer(plugin=plugin)
last_metrics_update = datetime.now(tz=timezone.utc)

update_the_most_outdated_person(plugin=plugin)
Loading

0 comments on commit 098581d

Please sign in to comment.