From 85495234186a9368b88dd5736fd6cac04f5e51ac Mon Sep 17 00:00:00 2001 From: Dan Fuchs Date: Fri, 13 Dec 2024 10:40:02 -0600 Subject: [PATCH] Publish events --- src/mobu/services/business/base.py | 16 ++- src/mobu/services/business/gitlfs.py | 73 +++++++---- src/mobu/services/business/notebookrunner.py | 113 +++++++++++++++--- .../services/business/nubladopythonloop.py | 33 ++++- src/mobu/services/business/tap.py | 14 +++ tests/business/gitlfs_test.py | 56 +++++++++ tests/business/notebookrunner_test.py | 112 +++++++++++++++++ tests/business/nubladopythonloop_test.py | 51 ++++++++ tests/business/tapqueryrunner_test.py | 19 +++ tests/business/tapquerysetrunner_test.py | 44 ++++++- tests/support/gitlfs.py | 5 +- 11 files changed, 493 insertions(+), 43 deletions(-) diff --git a/src/mobu/services/business/base.py b/src/mobu/services/business/base.py index 54a8c4c4..c4cbc348 100644 --- a/src/mobu/services/business/base.py +++ b/src/mobu/services/business/base.py @@ -8,7 +8,7 @@ from collections.abc import AsyncIterable, AsyncIterator from datetime import timedelta from enum import Enum -from typing import Generic, TypeVar +from typing import Generic, TypedDict, TypeVar from httpx import AsyncClient from safir.datetime import current_datetime @@ -25,6 +25,12 @@ __all__ = ["Business"] +class CommonEventAttrs(TypedDict): + flock: str | None + username: str + business: str + + class BusinessCommand(Enum): """Commands sent over the internal control queue.""" @@ -314,6 +320,14 @@ def dump(self) -> BusinessData: timings=self.timings.dump(), ) + def common_event_attrs(self) -> CommonEventAttrs: + """Attributes that are on every published event.""" + return { + "flock": self.flock, + "username": self.user.username, + "business": self.__class__.__name__, + } + async def _pause_no_return(self, interval: timedelta) -> None: """Pause for up to an interval, handling commands. diff --git a/src/mobu/services/business/gitlfs.py b/src/mobu/services/business/gitlfs.py index 186dbc41..858753da 100644 --- a/src/mobu/services/business/gitlfs.py +++ b/src/mobu/services/business/gitlfs.py @@ -10,6 +10,8 @@ from httpx import AsyncClient from structlog.stdlib import BoundLogger +from ...events import GitLfsCheck +from ...events import events_dependency as ed from ...exceptions import ComparisonError from ...models.business.gitlfs import GitLFSBusinessOptions from ...models.user import AuthenticatedUser @@ -73,10 +75,23 @@ async def execute(self) -> None: created anew. """ self.logger.info("Running Git-LFS check...") - with self.timings.start("execute git-lfs check") as sw: - await self._git_lfs_check() - elapsed = sw.elapsed.total_seconds() - self.logger.info(f"...Git-LFS check finished after {elapsed}s") + event = GitLfsCheck( + success=False, + **self.common_event_attrs(), + ) + try: + with self.timings.start("execute git-lfs check") as sw: + await self._git_lfs_check(event) + elapsed = sw.elapsed.total_seconds() + self.logger.info(f"...Git-LFS check finished after {elapsed}s") + + event.duration_total = sw.elapsed + event.success = True + except: + event.success = False + raise + finally: + await ed.events.git_lfs_check.publish(event) def _git(self, repo: Path) -> Git: """Return a configured Git client for a specified repo path. @@ -90,31 +105,41 @@ def _git(self, repo: Path) -> Git: logger=self.logger, ) - async def _git_lfs_check(self) -> None: + async def _git_lfs_check(self, event: GitLfsCheck) -> None: self._uuid = uuid.uuid4().hex with tempfile.TemporaryDirectory() as working_dir: self._working_dir = Path(working_dir) - with self.timings.start("create origin repo"): + with self.timings.start("create origin repo") as sw: await self._create_origin_repo() - with self.timings.start("populate origin repo"): + event.duration_create_origin_repo = sw.elapsed + with self.timings.start("populate origin repo") as sw: await self._populate_origin_repo() - with self.timings.start("create checkout repo"): + event.duration_populate_origin_repo = sw.elapsed + with self.timings.start("create checkout repo") as sw: await self._create_checkout_repo() - with self.timings.start("add LFS-tracked assets"): - await self._add_lfs_assets() + event.duration_create_checkout_repo = sw.elapsed + with self.timings.start("add LFS-tracked assets") as sw: + await self._add_lfs_assets(event) + event.duration_add_lfs_assets = sw.elapsed git = self._git(repo=Path(self._working_dir / "checkout")) - with self.timings.start("add git credentials"): + with self.timings.start("add git credentials") as sw: await self._add_credentials(git) - with self.timings.start("push LFS-tracked assets"): + event.duration_add_credentials = sw.elapsed + with self.timings.start("push LFS-tracked assets") as sw: await git.push("origin", "main") - with self.timings.start("remove git credentials"): + event.duration_push_lfs_tracked_assets = sw.elapsed + with self.timings.start("remove git credentials") as sw: Path(self._working_dir / ".git_credentials").unlink() - with self.timings.start("verify origin contents"): + event.duration_remove_git_credentials = sw.elapsed + with self.timings.start("verify origin contents") as sw: await self._verify_origin_contents() - with self.timings.start("create clone repo with asset"): + event.duration_verify_origin_contents = sw.elapsed + with self.timings.start("create clone repo with asset") as sw: await self._create_clone_repo() - with self.timings.start("verify asset contents"): + event.duration_create_clone_repo = sw.elapsed + with self.timings.start("verify asset contents") as sw: await self._verify_asset_contents() + event.duration_verify_asset_contents = sw.elapsed async def _create_origin_repo(self) -> None: origin = Path(self._working_dir / "origin") @@ -163,13 +188,16 @@ async def _populate_origin_repo(self) -> None: await git.add(".lfsconfig") await git.commit("-am", "Initial commit") - async def _add_lfs_assets(self) -> None: + async def _add_lfs_assets(self, event: GitLfsCheck) -> None: checkout_path = Path(self._working_dir / "checkout") git = self._git(repo=checkout_path) - with self.timings.start("install git lfs to checkout repo"): + with self.timings.start("install git lfs to checkout repo") as sw: await self._install_git_lfs(git) - with self.timings.start("add lfs data to checkout repo"): - await self._add_git_lfs_data(git) + event.duration_install_lfs_to_repo = sw.elapsed + checkout_path = Path(self._working_dir / "checkout") + with self.timings.start("add lfs data to checkout repo") as sw: + await self._add_git_lfs_data(git, event) + event.duration_add_lfs_data = sw.elapsed asset_path = Path(checkout_path / "assets") asset_path.mkdir() Path(asset_path / "UUID").write_text(self._uuid) @@ -192,16 +220,17 @@ async def _install_git_lfs(self, git: Git, scope: str = "--local") -> None: """ await git.lfs("install", scope) - async def _add_git_lfs_data(self, git: Git) -> None: + async def _add_git_lfs_data(self, git: Git, event: GitLfsCheck) -> None: if git.repo is None: raise ValueError("Git client repository cannot be 'None'") - with self.timings.start("git attribute installation"): + with self.timings.start("git attribute installation") as sw: shutil.copyfile( Path(self._package_data / "gitattributes"), Path(git.repo / ".gitattributes"), ) await git.add(".gitattributes") await git.config("--local", "lfs.url", self._lfs_write_url) + event.duration_git_attribute_installation = sw.elapsed async def _add_credentials(self, git: Git) -> None: credfile = Path(self._working_dir / ".git_credentials") diff --git a/src/mobu/services/business/notebookrunner.py b/src/mobu/services/business/notebookrunner.py index 6cf46010..307f5141 100644 --- a/src/mobu/services/business/notebookrunner.py +++ b/src/mobu/services/business/notebookrunner.py @@ -11,6 +11,7 @@ import shutil from collections.abc import AsyncIterator from contextlib import asynccontextmanager +from datetime import timedelta from pathlib import Path from tempfile import TemporaryDirectory from typing import Any @@ -23,6 +24,8 @@ from ...constants import GITHUB_REPO_CONFIG_PATH from ...dependencies.config import config_dependency +from ...events import NotebookCellExecution, NotebookExecution +from ...events import events_dependency as ed from ...exceptions import NotebookRepositoryError, RepositoryConfigError from ...models.business.notebookrunner import ( ListNotebookRunnerOptions, @@ -33,12 +36,19 @@ ) from ...models.repo import RepoConfig from ...models.user import AuthenticatedUser +from ...services.business.base import CommonEventAttrs from ...storage.git import Git from .nublado import NubladoBusiness __all__ = ["NotebookRunner"] +class CommonNotebookEventAttrs(CommonEventAttrs): + notebook: str + repo: str + repo_ref: str + + class NotebookRunner(NubladoBusiness): """Start a Jupyter lab and run a sequence of notebooks. @@ -305,26 +315,88 @@ async def execute_code(self, session: JupyterLabSession) -> None: msg = f"Notebook {self._notebook.name} iteration {iteration}" self.logger.info(msg) - for cell in self.read_notebook(self._notebook): - code = "".join(cell["source"]) - cell_id = cell.get("id") or cell["_index"] - ctx = CodeContext( - notebook=self._notebook.name, - path=str(self._notebook), - cell=cell_id, - cell_number=f"#{cell['_index']}", - cell_source=code, - ) - await self.execute_cell(session, code, cell_id, ctx) - if not await self.execution_idle(): - break + with self.timings.start( + "execute_notebook", self.annotations(self._notebook.name) + ) as sw: + try: + for cell in self.read_notebook(self._notebook): + code = "".join(cell["source"]) + cell_id = cell.get("id") or cell["_index"] + ctx = CodeContext( + notebook=self._notebook.name, + path=str(self._notebook), + cell=cell_id, + cell_number=f"#{cell['_index']}", + cell_source=code, + ) + await self.execute_cell(session, code, cell_id, ctx) + if not await self.execution_idle(): + break + except: + await self._publish_notebook_failure() + raise self.logger.info(f"Success running notebook {self._notebook.name}") + await self._publish_notebook_success(sw.elapsed) if not self._notebook_paths: self.logger.info("Done with this cycle of notebooks") if self.stopping: break + async def _publish_notebook_success(self, duration: timedelta) -> None: + await ed.events.notebook_execution.publish( + NotebookExecution( + **self.common_notebook_event_attrs(), + duration=duration, + success=True, + ) + ) + + async def _publish_notebook_failure(self) -> None: + await ed.events.notebook_execution.publish( + NotebookExecution( + **self.common_notebook_event_attrs(), + duration=None, + success=False, + ) + ) + + async def _publish_cell_success( + self, *, cell_id: str, contents: str, duration: timedelta + ) -> None: + await ed.events.notebook_cell_execution.publish( + NotebookCellExecution( + **self.common_notebook_event_attrs(), + duration=duration, + success=True, + cell_id=cell_id, + contents=contents, + ) + ) + + async def _publish_cell_failure( + self, *, cell_id: str, contents: str + ) -> None: + await ed.events.notebook_cell_execution.publish( + NotebookCellExecution( + **self.common_notebook_event_attrs(), + duration=None, + success=False, + cell_id=cell_id, + contents=contents, + ) + ) + + def common_notebook_event_attrs(self) -> CommonNotebookEventAttrs: + """Return notebook event attrs with the other common attrs.""" + notebook = self._notebook.name if self._notebook else "unknown" + return { + **self.common_event_attrs(), + "repo": self.options.repo_url, + "repo_ref": self.options.repo_ref, + "notebook": notebook, + } + async def execute_cell( self, session: JupyterLabSession, @@ -335,11 +407,22 @@ async def execute_cell( if not self._notebook: raise RuntimeError("Executing a cell without a notebook") self.logger.info(f"Executing cell {cell_id}:\n{code}\n") - with self.timings.start("execute_cell", self.annotations(cell_id)): + with self.timings.start( + "execute_cell", self.annotations(cell_id) + ) as sw: self._running_code = code - reply = await session.run_python(code, context=context) + try: + reply = await session.run_python(code, context=context) + except: + await self._publish_cell_failure( + cell_id=cell_id, contents=code + ) + raise self._running_code = None self.logger.info(f"Result:\n{reply}\n") + await self._publish_cell_success( + cell_id=cell_id, contents=code, duration=sw.elapsed + ) def dump(self) -> NotebookRunnerData: return NotebookRunnerData( diff --git a/src/mobu/services/business/nubladopythonloop.py b/src/mobu/services/business/nubladopythonloop.py index 2aedec7b..58420503 100644 --- a/src/mobu/services/business/nubladopythonloop.py +++ b/src/mobu/services/business/nubladopythonloop.py @@ -6,10 +6,14 @@ from __future__ import annotations +from datetime import timedelta + from httpx import AsyncClient from rubin.nublado.client import JupyterLabSession from structlog.stdlib import BoundLogger +from ...events import NubladoPythonExecution +from ...events import events_dependency as ed from ...models.business.nubladopythonloop import NubladoPythonLoopOptions from ...models.user import AuthenticatedUser from .nublado import NubladoBusiness @@ -54,8 +58,33 @@ def __init__( async def execute_code(self, session: JupyterLabSession) -> None: code = self.options.code for _count in range(self.options.max_executions): - with self.timings.start("execute_code", self.annotations()): - reply = await session.run_python(code) + with self.timings.start("execute_code", self.annotations()) as sw: + try: + reply = await session.run_python(code) + except: + await self._publish_failure(code=code) + raise self.logger.info(f"{code} -> {reply}") + await self._publish_success(code=code, duration=sw.elapsed) if not await self.execution_idle(): break + + async def _publish_success(self, code: str, duration: timedelta) -> None: + await ed.events.nublado_python_execution.publish( + NubladoPythonExecution( + duration=duration, + code=code, + success=True, + **self.common_event_attrs(), + ) + ) + + async def _publish_failure(self, code: str) -> None: + await ed.events.nublado_python_execution.publish( + NubladoPythonExecution( + duration=None, + code=code, + success=False, + **self.common_event_attrs(), + ) + ) diff --git a/src/mobu/services/business/tap.py b/src/mobu/services/business/tap.py index 03cbaeec..0898dc45 100644 --- a/src/mobu/services/business/tap.py +++ b/src/mobu/services/business/tap.py @@ -13,6 +13,8 @@ from structlog.stdlib import BoundLogger from ...dependencies.config import config_dependency +from ...events import TapQuery +from ...events import events_dependency as ed from ...exceptions import CodeExecutionError, TAPClientError from ...models.business.tap import TAPBusinessData, TAPBusinessOptions from ...models.user import AuthenticatedUser @@ -82,11 +84,13 @@ async def execute(self) -> None: with self.timings.start("execute_query", {"query": query}) as sw: self._running_query = query + success = False try: if self.options.sync: await self.run_sync_query(query) else: await self.run_async_query(query) + success = True except Exception as e: raise CodeExecutionError( user=self.user.username, @@ -96,6 +100,16 @@ async def execute(self) -> None: started_at=sw.start_time, error=f"{type(e).__name__}: {e!s}", ) from e + finally: + await ed.events.tap_query.publish( + payload=TapQuery( + success=success, + duration=sw.elapsed, + sync=self.options.sync, + query=query, + **self.common_event_attrs(), + ) + ) self._running_query = None elapsed = sw.elapsed.total_seconds() diff --git a/tests/business/gitlfs_test.py b/tests/business/gitlfs_test.py index 736beec9..eac45e89 100644 --- a/tests/business/gitlfs_test.py +++ b/tests/business/gitlfs_test.py @@ -1,10 +1,14 @@ """Tests for GitLFS.""" +from typing import cast from unittest.mock import ANY import pytest import respx from httpx import AsyncClient +from safir.metrics import NOT_NONE, MockEventPublisher + +from mobu.events import events_dependency as ed from ..support.gafaelfawr import mock_gafaelfawr from ..support.gitlfs import flock_message @@ -49,6 +53,32 @@ async def test_run( assert "Running Git-LFS check..." in r.text assert "Git-LFS check finished after " in r.text + published = cast(MockEventPublisher, ed.events.git_lfs_check).published + published.assert_published_all( + [ + { + "business": "GitLFSBusiness", + "duration_add_credentials": NOT_NONE, + "duration_add_lfs_assets": NOT_NONE, + "duration_add_lfs_data": NOT_NONE, + "duration_create_checkout_repo": NOT_NONE, + "duration_create_clone_repo": NOT_NONE, + "duration_create_origin_repo": NOT_NONE, + "duration_git_attribute_installation": ANY, + "duration_install_lfs_to_repo": NOT_NONE, + "duration_populate_origin_repo": NOT_NONE, + "duration_push_lfs_tracked_assets": NOT_NONE, + "duration_remove_git_credentials": NOT_NONE, + "duration_total": NOT_NONE, + "duration_verify_asset_contents": NOT_NONE, + "duration_verify_origin_contents": NOT_NONE, + "flock": "test", + "success": True, + "username": "bot-mobu-testuser1", + } + ] + ) + @pytest.mark.asyncio async def test_fail(client: AsyncClient, respx_mock: respx.Router) -> None: @@ -88,3 +118,29 @@ async def test_fail(client: AsyncClient, respx_mock: respx.Router) -> None: assert r.status_code == 200 assert "Running Git-LFS check..." in r.text assert ("mobu.exceptions.SubprocessError") in r.text + + published = cast(MockEventPublisher, ed.events.git_lfs_check).published + published.assert_published_all( + [ + { + "business": "GitLFSBusiness", + "duration_add_credentials": NOT_NONE, + "duration_add_lfs_assets": NOT_NONE, + "duration_add_lfs_data": NOT_NONE, + "duration_create_checkout_repo": NOT_NONE, + "duration_create_clone_repo": None, + "duration_create_origin_repo": NOT_NONE, + "duration_git_attribute_installation": NOT_NONE, + "duration_install_lfs_to_repo": NOT_NONE, + "duration_populate_origin_repo": NOT_NONE, + "duration_push_lfs_tracked_assets": None, + "duration_remove_git_credentials": None, + "duration_total": None, + "duration_verify_asset_contents": None, + "duration_verify_origin_contents": None, + "flock": "test", + "success": False, + "username": "bot-mobu-testuser1", + } + ] + ) diff --git a/tests/business/notebookrunner_test.py b/tests/business/notebookrunner_test.py index a584bc0c..8aa7f00c 100644 --- a/tests/business/notebookrunner_test.py +++ b/tests/business/notebookrunner_test.py @@ -5,14 +5,18 @@ import os import shutil from pathlib import Path +from typing import cast from unittest.mock import ANY import pytest import respx +from anys import AnySearch from httpx import AsyncClient from rubin.nublado.client.testing import MockJupyter +from safir.metrics import NOT_NONE, MockEventPublisher from safir.testing.slack import MockSlackWebhook +from mobu.events import events_dependency as ed from mobu.storage.git import Git from ..support.constants import TEST_DATA_DIR @@ -111,6 +115,50 @@ async def test_run( # Make sure mobu ran all of the notebooks it thinks it should have assert "Done with this cycle of notebooks" in r.text + # Check events + common = { + "business": "NotebookRunner", + "duration": NOT_NONE, + "flock": "test", + "notebook": AnySearch("test-notebook.ipynb$"), + "repo": AnySearch("/notebooks$"), + "repo_ref": "main", + "success": True, + "username": "bot-mobu-testuser1", + } + pub_notebook = cast( + MockEventPublisher, ed.events.notebook_execution + ).published + pub_notebook.assert_published_all([common]) + + pub_cell = cast( + MockEventPublisher, + ed.events.notebook_cell_execution, + ).published + pub_cell.assert_published_all( + [ + item | common + for item in [ + { + "cell_id": "f84f0959", + "contents": 'print("This is a test")', + }, + { + "cell_id": "44ada997", + "contents": 'print("This is another test")', + }, + { + "cell_id": "53a941a4", + "contents": 'print("Final test")', + }, + { + "cell_id": "823560c6", + "contents": "", + }, + ] + ] + ) + @pytest.mark.asyncio async def test_run_debug_log( @@ -280,6 +328,40 @@ async def test_run_recursive( # Make sure mobu ran all of the notebooks it thinks it should have assert "Done with this cycle of notebooks" in r.text + # Check events + common = { + "business": "NotebookRunner", + "duration": NOT_NONE, + "flock": "test", + "repo": AnySearch("/notebooks$"), + "repo_ref": "main", + "success": True, + "username": "bot-mobu-testuser1", + } + published = cast( + MockEventPublisher, ed.events.notebook_execution + ).published + published.assert_published_all( + [ + item | common + for item in [ + { + "notebook": AnySearch("test-some-other-dir.ipynb$"), + }, + { + "notebook": AnySearch("test-some-dir-notebook.ipynb$"), + }, + { + "notebook": AnySearch("test-notebook.ipynb$"), + }, + { + "notebook": AnySearch("test-double-nested-dir.ipynb$"), + }, + ] + ], + any_order=True, + ) + @pytest.mark.asyncio async def test_run_required_services( @@ -946,3 +1028,33 @@ async def test_alert( ] error = slack.messages[0]["attachments"][0]["blocks"][0]["text"]["text"] assert "KeyError: 'nothing'" in error + + # Check events + common = { + "business": "NotebookRunner", + "duration": None, + "flock": "test", + "notebook": AnySearch("exception.ipynb"), + "repo": AnySearch("/notebooks"), + "repo_ref": "main", + "username": "bot-mobu-testuser1", + } + pub_notebook = cast( + MockEventPublisher, ed.events.notebook_execution + ).published + pub_notebook.assert_published_all([{"success": False} | common]) + + pub_cell = cast( + MockEventPublisher, + ed.events.notebook_cell_execution, + ).published + pub_cell.assert_published_all( + [ + common + | { + "cell_id": "ed399c0a", + "contents": 'foo = {"bar": "baz"}\nfoo["nothing"]', + "success": False, + } + ] + ) diff --git a/tests/business/nubladopythonloop_test.py b/tests/business/nubladopythonloop_test.py index df6bd5ad..71c92707 100644 --- a/tests/business/nubladopythonloop_test.py +++ b/tests/business/nubladopythonloop_test.py @@ -4,6 +4,7 @@ import asyncio import re +from typing import cast from unittest.mock import ANY from urllib.parse import urljoin @@ -15,9 +16,11 @@ JupyterState, MockJupyter, ) +from safir.metrics import NOT_NONE, MockEventPublisher from safir.testing.slack import MockSlackWebhook from mobu.dependencies.config import config_dependency +from mobu.events import events_dependency as ed from ..support.gafaelfawr import mock_gafaelfawr from ..support.util import wait_for_business @@ -84,6 +87,38 @@ async def test_run( r = await client.delete("/mobu/flocks/test") assert r.status_code == 204 + # Check events + publisher = cast(MockEventPublisher, ed.events.nublado_python_execution) + published = publisher.published + published.assert_published_all( + [ + { + "business": "NubladoPythonLoop", + "code": 'print(2+2, end="")', + "duration": NOT_NONE, + "flock": "test", + "success": True, + "username": "bot-mobu-testuser1", + }, + { + "business": "NubladoPythonLoop", + "code": 'print(2+2, end="")', + "duration": NOT_NONE, + "flock": "test", + "success": True, + "username": "bot-mobu-testuser1", + }, + { + "business": "NubladoPythonLoop", + "code": 'print(2+2, end="")', + "duration": NOT_NONE, + "flock": "test", + "success": True, + "username": "bot-mobu-testuser1", + }, + ] + ) + @pytest.mark.asyncio async def test_reuse_lab( @@ -708,6 +743,22 @@ async def test_code_exception( error = slack.messages[0]["attachments"][0]["blocks"][0]["text"]["text"] assert "Exception: some error" in error + # Check events + publisher = cast(MockEventPublisher, ed.events.nublado_python_execution) + published = publisher.published + published.assert_published_all( + [ + { + "business": "NubladoPythonLoop", + "code": 'raise Exception("some error")', + "duration": None, + "flock": "test", + "success": False, + "username": "bot-mobu-testuser1", + } + ] + ) + @pytest.mark.asyncio async def test_long_error( diff --git a/tests/business/tapqueryrunner_test.py b/tests/business/tapqueryrunner_test.py index 412f01c6..4cb5d640 100644 --- a/tests/business/tapqueryrunner_test.py +++ b/tests/business/tapqueryrunner_test.py @@ -2,12 +2,16 @@ from __future__ import annotations +from typing import cast from unittest.mock import ANY, patch import pytest import pyvo import respx from httpx import AsyncClient +from safir.metrics import NOT_NONE, MockEventPublisher + +from mobu.events import events_dependency as ed from ..support.gafaelfawr import mock_gafaelfawr from ..support.util import wait_for_business @@ -68,3 +72,18 @@ async def test_run(client: AsyncClient, respx_mock: respx.Router) -> None: found = True assert found, "Ran one of the appropriate queries" assert "Query finished after " in r.text + + published = cast(MockEventPublisher, ed.events.tap_query).published + published.assert_published_all( + [ + { + "business": "TAPQueryRunner", + "duration": NOT_NONE, + "flock": "test", + "query": NOT_NONE, + "success": True, + "sync": True, + "username": "bot-mobu-testuser1", + } + ] + ) diff --git a/tests/business/tapquerysetrunner_test.py b/tests/business/tapquerysetrunner_test.py index 2a1371eb..3787595e 100644 --- a/tests/business/tapquerysetrunner_test.py +++ b/tests/business/tapquerysetrunner_test.py @@ -13,9 +13,11 @@ import yaml from httpx import AsyncClient from safir.dependencies.http_client import http_client_dependency +from safir.metrics import NOT_NONE, MockEventPublisher from safir.testing.slack import MockSlackWebhook import mobu +from mobu.events import events_dependency as ed from mobu.models.business.tapquerysetrunner import TAPQuerySetRunnerOptions from mobu.models.user import AuthenticatedUser from mobu.services.business.tapquerysetrunner import TAPQuerySetRunner @@ -68,6 +70,22 @@ async def test_run(client: AsyncClient, respx_mock: respx.Router) -> None: assert "Running (sync): " in r.text assert "Query finished after " in r.text + # Confirm metrics events + published = cast(MockEventPublisher, ed.events.tap_query).published + published.assert_published_all( + [ + { + "business": "TAPQuerySetRunner", + "duration": NOT_NONE, + "flock": "test", + "query": NOT_NONE, + "success": True, + "sync": True, + "username": "bot-mobu-testuser1", + } + ] + ) + @pytest.mark.asyncio async def test_setup_error( @@ -146,7 +164,7 @@ async def test_setup_error( @pytest.mark.asyncio -async def test_alert( +async def test_failure( client: AsyncClient, slack: MockSlackWebhook, respx_mock: respx.Router ) -> None: mock_gafaelfawr(respx_mock) @@ -170,6 +188,22 @@ async def test_alert( data = await wait_for_business(client, "bot-mobu-testuser1") assert data["business"]["failure_count"] == 1 + # Confirm metrics events + published = cast(MockEventPublisher, ed.events.tap_query).published + published.assert_published_all( + [ + { + "business": "TAPQuerySetRunner", + "duration": NOT_NONE, + "flock": "test", + "query": NOT_NONE, + "success": False, + "sync": True, + "username": "bot-mobu-testuser1", + } + ] + ) + assert slack.messages == [ { "blocks": [ @@ -257,7 +291,13 @@ async def test_random_object() -> None: options = TAPQuerySetRunnerOptions(query_set=query_set) http_client = await http_client_dependency() with patch.object(pyvo.dal, "TAPService"): - runner = TAPQuerySetRunner(options, user, http_client, logger) + runner = TAPQuerySetRunner( + options=options, + user=user, + http_client=http_client, + logger=logger, + flock=None, + ) parameters = runner._generate_parameters() assert parameters["object"] in objects diff --git a/tests/support/gitlfs.py b/tests/support/gitlfs.py index ceabb409..130261d4 100644 --- a/tests/support/gitlfs.py +++ b/tests/support/gitlfs.py @@ -3,6 +3,7 @@ from contextlib import suppress from pathlib import Path +from mobu.events import GitLfsCheck from mobu.exceptions import ComparisonError, SubprocessError from mobu.services.business.gitlfs import GitLFSBusiness from mobu.storage.git import Git @@ -37,7 +38,9 @@ async def uninstall_git_lfs( await git.lfs("uninstall", scope) -async def no_git_lfs_data(self: GitLFSBusiness, git: Git) -> None: +async def no_git_lfs_data( + self: GitLFSBusiness, git: Git, event: GitLfsCheck +) -> None: pass