Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
fajpunk committed Dec 10, 2024
1 parent ba7b640 commit c28a451
Show file tree
Hide file tree
Showing 27 changed files with 718 additions and 61 deletions.
2 changes: 1 addition & 1 deletion docs/development/github.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ You can use https://smee.io to proxy GitHub webhook requests from the data-dev M
pyyaml
-safir>=5.0.0
+# safir>=5.0.0
+-e /home/danfuchs/src/safir
+-e /home/danfuchs/src/safir/safir
shortuuid
structlog
Expand Down
1 change: 1 addition & 0 deletions requirements/dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
-c main.txt

# Testing
anys
asgi-lifespan
coverage[toml]
documenteer[guide]
Expand Down
2 changes: 1 addition & 1 deletion requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ websockets

# Uncomment this, change the branch, comment out safir above, and run make
# update-deps-no-hashes to test against an unreleased version of Safir.
# safir @ git+https://github.com/lsst-sqre/safir@tickets/DM-38272
#safir @ git+https://github.com/lsst-sqre/safir/safir@tickets/DM-38272
# Similar for rubin-nublado-client:
#rubin-nublado-client@git+https://github.com/lsst-sqre/nublado@tickets/DM-45702#subdirectory=client
6 changes: 3 additions & 3 deletions ruff-shared.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ ignore = [
"S607", # using PATH is not a security vulnerability
"SIM102", # sometimes the formatting of nested if statements is clearer
"SIM117", # sometimes nested with contexts are clearer
"TCH001", # we decided to not maintain separate TYPE_CHECKING blocks
"TCH002", # we decided to not maintain separate TYPE_CHECKING blocks
"TCH003", # we decided to not maintain separate TYPE_CHECKING blocks
"TC001", # we decided to not maintain separate TYPE_CHECKING blocks
"TC002", # we decided to not maintain separate TYPE_CHECKING blocks
"TC003", # we decided to not maintain separate TYPE_CHECKING blocks
"TD003", # we don't require issues be created for TODOs
"TID252", # if we're going to use relative imports, use them always
"TRY003", # good general advice but lint is way too aggressive
Expand Down
6 changes: 6 additions & 0 deletions src/mobu/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pydantic.alias_generators import to_camel
from pydantic_settings import BaseSettings, SettingsConfigDict
from safir.logging import LogLevel, Profile
from safir.metrics import MetricsConfiguration, metrics_configuration_factory

from mobu.models.flock import FlockConfig

Expand Down Expand Up @@ -237,6 +238,11 @@ class Configuration(BaseSettings):
title="Log level of the application's logger",
)

metrics: MetricsConfiguration = Field(
default_factory=metrics_configuration_factory,
title="Metrics configuration",
)

github_ci_app: GitHubCiAppConfig | None = Field(
None,
title="GitHub CI app config",
Expand Down
100 changes: 100 additions & 0 deletions src/mobu/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""App metrics events."""

from datetime import timedelta
from typing import override

from safir.dependencies.metrics import EventDependency, EventMaker
from safir.metrics import EventManager, EventPayload


class EventBase(EventPayload):
"""Attributes on every mobu event."""

flock: str | None
business: str
username: str


class NotebookBase(EventBase):
"""Attributes for all notebook-related events."""

notebook: str
repo: str
repo_ref: str


class NotebookExecution(NotebookBase):
"""Reported after a notebook is finished executing."""

duration: timedelta | None
success: bool


class NotebookCellExecution(NotebookBase):
"""Reported after a notebook cell is finished executing."""

duration: timedelta | None
cell_id: str
success: bool
contents: str


class NubladoPythonExecution(EventBase):
"""Reported after a notebook cell is finished executing."""

duration: timedelta | None
success: bool
code: str


class GitLfsCheck(EventBase):
"""Reported from Git LFS businesses."""

success: bool
duration_total: timedelta | None = None
duration_create_origin_repo: timedelta | None = None
duration_populate_origin_repo: timedelta | None = None
duration_create_checkout_repo: timedelta | None = None
duration_add_lfs_assets: timedelta | None = None
duration_add_credentials: timedelta | None = None
duration_push_lfs_tracked_assets: timedelta | None = None
duration_remove_git_credentials: timedelta | None = None
duration_verify_origin_contents: timedelta | None = None
duration_create_clone_repo: timedelta | None = None
duration_verify_asset_contents: timedelta | None = None
duration_install_lfs_to_repo: timedelta | None = None
duration_add_lfs_data: timedelta | None = None
duration_git_attribute_installation: timedelta | None = None


class TapQuery(EventBase):
"""Reported when a TAP query is executed."""

success: bool
duration: timedelta | None
sync: bool
query: str


class Events(EventMaker):
"""Container for app metrics event publishers."""

@override
async def initialize(self, manager: EventManager) -> None:
self.tap_query = await manager.create_publisher("tap_query", TapQuery)
self.git_lfs_check = await manager.create_publisher(
"git_lfs_check", GitLfsCheck
)
self.notebook_execution = await manager.create_publisher(
"notebook_execution", NotebookExecution
)
self.notebook_cell_execution = await manager.create_publisher(
"notebook_cell_execution", NotebookCellExecution
)
self.nublado_python_execution = await manager.create_publisher(
"nublado_python_execution", NubladoPythonExecution
)


# We'll call .initalize on this in our app start up
events_dependency = EventDependency(Events())
8 changes: 8 additions & 0 deletions src/mobu/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .dependencies.config import config_dependency
from .dependencies.context import context_dependency
from .dependencies.github import ci_manager_dependency
from .events import events_dependency
from .handlers.external import external_router
from .handlers.github_ci_app import api_router as github_ci_app_router
from .handlers.github_refresh_app import (
Expand All @@ -48,6 +49,13 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
raise RuntimeError("MOBU_GAFAELFAWR_TOKEN was not set")

await context_dependency.initialize()

event_manager = config.metrics.make_manager(
logger=context_dependency.process_context.logger
)
await event_manager.initialize()
await events_dependency.initialize(event_manager)

await context_dependency.process_context.manager.autostart()

status_interval = timedelta(days=1)
Expand Down
23 changes: 22 additions & 1 deletion src/mobu/services/business/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from collections.abc import AsyncIterable, AsyncIterator
from datetime import timedelta
from enum import Enum
from typing import Generic, TypeVar
from typing import Generic, TypedDict, TypeVar

from httpx import AsyncClient
from safir.datetime import current_datetime
Expand All @@ -25,6 +25,12 @@
__all__ = ["Business"]


class CommonEventAttrs(TypedDict):
flock: str | None
username: str
business: str


class BusinessCommand(Enum):
"""Commands sent over the internal control queue."""

Expand Down Expand Up @@ -59,6 +65,8 @@ class Business(Generic[T], metaclass=ABCMeta):
Shared HTTP client.
logger
Logger to use to report the results of business.
flock
Flock that is running this business, if it is running in a flock.
Attributes
----------
Expand All @@ -79,14 +87,18 @@ class Business(Generic[T], metaclass=ABCMeta):
Execution timings.
stopping
Whether `stop` has been called and further execution should stop.
flock
Flock that is running this business, if it is running in a flock.
"""

def __init__(
self,
*,
options: T,
user: AuthenticatedUser,
http_client: AsyncClient,
logger: BoundLogger,
flock: str | None,
) -> None:
self.options = options
self.user = user
Expand All @@ -98,6 +110,7 @@ def __init__(
self.control: Queue[BusinessCommand] = Queue()
self.stopping = False
self.refreshing = False
self.flock = flock

# Methods that should be overridden by child classes if needed.

Expand Down Expand Up @@ -307,6 +320,14 @@ def dump(self) -> BusinessData:
timings=self.timings.dump(),
)

def common_event_attrs(self) -> CommonEventAttrs:
"""Attributes that are on every published event."""
return {
"flock": self.flock,
"username": self.user.username,
"business": self.__class__.__name__,
}

async def _pause_no_return(self, interval: timedelta) -> None:
"""Pause for up to an interval, handling commands.
Expand Down
Loading

0 comments on commit c28a451

Please sign in to comment.