Skip to content

Commit

Permalink
Publish events
Browse files Browse the repository at this point in the history
  • Loading branch information
fajpunk committed Dec 13, 2024
1 parent 0bf5656 commit 8549523
Show file tree
Hide file tree
Showing 11 changed files with 493 additions and 43 deletions.
16 changes: 15 additions & 1 deletion src/mobu/services/business/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from collections.abc import AsyncIterable, AsyncIterator
from datetime import timedelta
from enum import Enum
from typing import Generic, TypeVar
from typing import Generic, TypedDict, TypeVar

from httpx import AsyncClient
from safir.datetime import current_datetime
Expand All @@ -25,6 +25,12 @@
__all__ = ["Business"]


class CommonEventAttrs(TypedDict):
flock: str | None
username: str
business: str


class BusinessCommand(Enum):
"""Commands sent over the internal control queue."""

Expand Down Expand Up @@ -314,6 +320,14 @@ def dump(self) -> BusinessData:
timings=self.timings.dump(),
)

def common_event_attrs(self) -> CommonEventAttrs:
"""Attributes that are on every published event."""
return {
"flock": self.flock,
"username": self.user.username,
"business": self.__class__.__name__,
}

async def _pause_no_return(self, interval: timedelta) -> None:
"""Pause for up to an interval, handling commands.
Expand Down
73 changes: 51 additions & 22 deletions src/mobu/services/business/gitlfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from httpx import AsyncClient
from structlog.stdlib import BoundLogger

from ...events import GitLfsCheck
from ...events import events_dependency as ed
from ...exceptions import ComparisonError
from ...models.business.gitlfs import GitLFSBusinessOptions
from ...models.user import AuthenticatedUser
Expand Down Expand Up @@ -73,10 +75,23 @@ async def execute(self) -> None:
created anew.
"""
self.logger.info("Running Git-LFS check...")
with self.timings.start("execute git-lfs check") as sw:
await self._git_lfs_check()
elapsed = sw.elapsed.total_seconds()
self.logger.info(f"...Git-LFS check finished after {elapsed}s")
event = GitLfsCheck(
success=False,
**self.common_event_attrs(),
)
try:
with self.timings.start("execute git-lfs check") as sw:
await self._git_lfs_check(event)
elapsed = sw.elapsed.total_seconds()
self.logger.info(f"...Git-LFS check finished after {elapsed}s")

event.duration_total = sw.elapsed
event.success = True
except:
event.success = False
raise
finally:
await ed.events.git_lfs_check.publish(event)

def _git(self, repo: Path) -> Git:
"""Return a configured Git client for a specified repo path.
Expand All @@ -90,31 +105,41 @@ def _git(self, repo: Path) -> Git:
logger=self.logger,
)

async def _git_lfs_check(self) -> None:
async def _git_lfs_check(self, event: GitLfsCheck) -> None:
self._uuid = uuid.uuid4().hex
with tempfile.TemporaryDirectory() as working_dir:
self._working_dir = Path(working_dir)
with self.timings.start("create origin repo"):
with self.timings.start("create origin repo") as sw:
await self._create_origin_repo()
with self.timings.start("populate origin repo"):
event.duration_create_origin_repo = sw.elapsed
with self.timings.start("populate origin repo") as sw:
await self._populate_origin_repo()
with self.timings.start("create checkout repo"):
event.duration_populate_origin_repo = sw.elapsed
with self.timings.start("create checkout repo") as sw:
await self._create_checkout_repo()
with self.timings.start("add LFS-tracked assets"):
await self._add_lfs_assets()
event.duration_create_checkout_repo = sw.elapsed
with self.timings.start("add LFS-tracked assets") as sw:
await self._add_lfs_assets(event)
event.duration_add_lfs_assets = sw.elapsed
git = self._git(repo=Path(self._working_dir / "checkout"))
with self.timings.start("add git credentials"):
with self.timings.start("add git credentials") as sw:
await self._add_credentials(git)
with self.timings.start("push LFS-tracked assets"):
event.duration_add_credentials = sw.elapsed
with self.timings.start("push LFS-tracked assets") as sw:
await git.push("origin", "main")
with self.timings.start("remove git credentials"):
event.duration_push_lfs_tracked_assets = sw.elapsed
with self.timings.start("remove git credentials") as sw:
Path(self._working_dir / ".git_credentials").unlink()
with self.timings.start("verify origin contents"):
event.duration_remove_git_credentials = sw.elapsed
with self.timings.start("verify origin contents") as sw:
await self._verify_origin_contents()
with self.timings.start("create clone repo with asset"):
event.duration_verify_origin_contents = sw.elapsed
with self.timings.start("create clone repo with asset") as sw:
await self._create_clone_repo()
with self.timings.start("verify asset contents"):
event.duration_create_clone_repo = sw.elapsed
with self.timings.start("verify asset contents") as sw:
await self._verify_asset_contents()
event.duration_verify_asset_contents = sw.elapsed

async def _create_origin_repo(self) -> None:
origin = Path(self._working_dir / "origin")
Expand Down Expand Up @@ -163,13 +188,16 @@ async def _populate_origin_repo(self) -> None:
await git.add(".lfsconfig")
await git.commit("-am", "Initial commit")

async def _add_lfs_assets(self) -> None:
async def _add_lfs_assets(self, event: GitLfsCheck) -> None:
checkout_path = Path(self._working_dir / "checkout")
git = self._git(repo=checkout_path)
with self.timings.start("install git lfs to checkout repo"):
with self.timings.start("install git lfs to checkout repo") as sw:
await self._install_git_lfs(git)
with self.timings.start("add lfs data to checkout repo"):
await self._add_git_lfs_data(git)
event.duration_install_lfs_to_repo = sw.elapsed
checkout_path = Path(self._working_dir / "checkout")
with self.timings.start("add lfs data to checkout repo") as sw:
await self._add_git_lfs_data(git, event)
event.duration_add_lfs_data = sw.elapsed
asset_path = Path(checkout_path / "assets")
asset_path.mkdir()
Path(asset_path / "UUID").write_text(self._uuid)
Expand All @@ -192,16 +220,17 @@ async def _install_git_lfs(self, git: Git, scope: str = "--local") -> None:
"""
await git.lfs("install", scope)

async def _add_git_lfs_data(self, git: Git) -> None:
async def _add_git_lfs_data(self, git: Git, event: GitLfsCheck) -> None:
if git.repo is None:
raise ValueError("Git client repository cannot be 'None'")
with self.timings.start("git attribute installation"):
with self.timings.start("git attribute installation") as sw:
shutil.copyfile(
Path(self._package_data / "gitattributes"),
Path(git.repo / ".gitattributes"),
)
await git.add(".gitattributes")
await git.config("--local", "lfs.url", self._lfs_write_url)
event.duration_git_attribute_installation = sw.elapsed

async def _add_credentials(self, git: Git) -> None:
credfile = Path(self._working_dir / ".git_credentials")
Expand Down
113 changes: 98 additions & 15 deletions src/mobu/services/business/notebookrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import shutil
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from datetime import timedelta
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
Expand All @@ -23,6 +24,8 @@

from ...constants import GITHUB_REPO_CONFIG_PATH
from ...dependencies.config import config_dependency
from ...events import NotebookCellExecution, NotebookExecution
from ...events import events_dependency as ed
from ...exceptions import NotebookRepositoryError, RepositoryConfigError
from ...models.business.notebookrunner import (
ListNotebookRunnerOptions,
Expand All @@ -33,12 +36,19 @@
)
from ...models.repo import RepoConfig
from ...models.user import AuthenticatedUser
from ...services.business.base import CommonEventAttrs
from ...storage.git import Git
from .nublado import NubladoBusiness

__all__ = ["NotebookRunner"]


class CommonNotebookEventAttrs(CommonEventAttrs):
notebook: str
repo: str
repo_ref: str


class NotebookRunner(NubladoBusiness):
"""Start a Jupyter lab and run a sequence of notebooks.
Expand Down Expand Up @@ -305,26 +315,88 @@ async def execute_code(self, session: JupyterLabSession) -> None:
msg = f"Notebook {self._notebook.name} iteration {iteration}"
self.logger.info(msg)

for cell in self.read_notebook(self._notebook):
code = "".join(cell["source"])
cell_id = cell.get("id") or cell["_index"]
ctx = CodeContext(
notebook=self._notebook.name,
path=str(self._notebook),
cell=cell_id,
cell_number=f"#{cell['_index']}",
cell_source=code,
)
await self.execute_cell(session, code, cell_id, ctx)
if not await self.execution_idle():
break
with self.timings.start(
"execute_notebook", self.annotations(self._notebook.name)
) as sw:
try:
for cell in self.read_notebook(self._notebook):
code = "".join(cell["source"])
cell_id = cell.get("id") or cell["_index"]
ctx = CodeContext(
notebook=self._notebook.name,
path=str(self._notebook),
cell=cell_id,
cell_number=f"#{cell['_index']}",
cell_source=code,
)
await self.execute_cell(session, code, cell_id, ctx)
if not await self.execution_idle():
break
except:
await self._publish_notebook_failure()
raise

self.logger.info(f"Success running notebook {self._notebook.name}")
await self._publish_notebook_success(sw.elapsed)
if not self._notebook_paths:
self.logger.info("Done with this cycle of notebooks")
if self.stopping:
break

async def _publish_notebook_success(self, duration: timedelta) -> None:
await ed.events.notebook_execution.publish(
NotebookExecution(
**self.common_notebook_event_attrs(),
duration=duration,
success=True,
)
)

async def _publish_notebook_failure(self) -> None:
await ed.events.notebook_execution.publish(
NotebookExecution(
**self.common_notebook_event_attrs(),
duration=None,
success=False,
)
)

async def _publish_cell_success(
self, *, cell_id: str, contents: str, duration: timedelta
) -> None:
await ed.events.notebook_cell_execution.publish(
NotebookCellExecution(
**self.common_notebook_event_attrs(),
duration=duration,
success=True,
cell_id=cell_id,
contents=contents,
)
)

async def _publish_cell_failure(
self, *, cell_id: str, contents: str
) -> None:
await ed.events.notebook_cell_execution.publish(
NotebookCellExecution(
**self.common_notebook_event_attrs(),
duration=None,
success=False,
cell_id=cell_id,
contents=contents,
)
)

def common_notebook_event_attrs(self) -> CommonNotebookEventAttrs:
"""Return notebook event attrs with the other common attrs."""
notebook = self._notebook.name if self._notebook else "unknown"
return {
**self.common_event_attrs(),
"repo": self.options.repo_url,
"repo_ref": self.options.repo_ref,
"notebook": notebook,
}

async def execute_cell(
self,
session: JupyterLabSession,
Expand All @@ -335,11 +407,22 @@ async def execute_cell(
if not self._notebook:
raise RuntimeError("Executing a cell without a notebook")
self.logger.info(f"Executing cell {cell_id}:\n{code}\n")
with self.timings.start("execute_cell", self.annotations(cell_id)):
with self.timings.start(
"execute_cell", self.annotations(cell_id)
) as sw:
self._running_code = code
reply = await session.run_python(code, context=context)
try:
reply = await session.run_python(code, context=context)
except:
await self._publish_cell_failure(
cell_id=cell_id, contents=code
)
raise
self._running_code = None
self.logger.info(f"Result:\n{reply}\n")
await self._publish_cell_success(
cell_id=cell_id, contents=code, duration=sw.elapsed
)

def dump(self) -> NotebookRunnerData:
return NotebookRunnerData(
Expand Down
33 changes: 31 additions & 2 deletions src/mobu/services/business/nubladopythonloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@

from __future__ import annotations

from datetime import timedelta

from httpx import AsyncClient
from rubin.nublado.client import JupyterLabSession
from structlog.stdlib import BoundLogger

from ...events import NubladoPythonExecution
from ...events import events_dependency as ed
from ...models.business.nubladopythonloop import NubladoPythonLoopOptions
from ...models.user import AuthenticatedUser
from .nublado import NubladoBusiness
Expand Down Expand Up @@ -54,8 +58,33 @@ def __init__(
async def execute_code(self, session: JupyterLabSession) -> None:
code = self.options.code
for _count in range(self.options.max_executions):
with self.timings.start("execute_code", self.annotations()):
reply = await session.run_python(code)
with self.timings.start("execute_code", self.annotations()) as sw:
try:
reply = await session.run_python(code)
except:
await self._publish_failure(code=code)
raise
self.logger.info(f"{code} -> {reply}")
await self._publish_success(code=code, duration=sw.elapsed)
if not await self.execution_idle():
break

async def _publish_success(self, code: str, duration: timedelta) -> None:
await ed.events.nublado_python_execution.publish(
NubladoPythonExecution(
duration=duration,
code=code,
success=True,
**self.common_event_attrs(),
)
)

async def _publish_failure(self, code: str) -> None:
await ed.events.nublado_python_execution.publish(
NubladoPythonExecution(
duration=None,
code=code,
success=False,
**self.common_event_attrs(),
)
)
Loading

0 comments on commit 8549523

Please sign in to comment.