diff --git a/changelog.d/20231214_124422_rra_DM_42182.md b/changelog.d/20231214_124422_rra_DM_42182.md new file mode 100644 index 00000000..29e4ac9d --- /dev/null +++ b/changelog.d/20231214_124422_rra_DM_42182.md @@ -0,0 +1,3 @@ +### Backwards-incompatible changes + +- Rename the existing `TAPQueryRunner` business to `TAPQuerySetRunner` to more accurately capture what it does. Add a new `TAPQueryRunner` business that runs queries chosen randomly from a list. Based on work by @stvoutsin. diff --git a/src/mobu/data/tapqueryrunner/dp0.1/cone-object.sql b/src/mobu/data/tapquerysetrunner/dp0.1/cone-object.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/cone-object.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/cone-object.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/cone-photometry.sql b/src/mobu/data/tapquerysetrunner/dp0.1/cone-photometry.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/cone-photometry.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/cone-photometry.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/cone-position.sql b/src/mobu/data/tapquerysetrunner/dp0.1/cone-position.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/cone-position.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/cone-position.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/histogram-flux.sql b/src/mobu/data/tapquerysetrunner/dp0.1/histogram-flux.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/histogram-flux.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/histogram-flux.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/histogram-magnitude.sql b/src/mobu/data/tapquerysetrunner/dp0.1/histogram-magnitude.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/histogram-magnitude.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/histogram-magnitude.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/neighbor-near.sql b/src/mobu/data/tapquerysetrunner/dp0.1/neighbor-near.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/neighbor-near.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/neighbor-near.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/object-one.sql b/src/mobu/data/tapquerysetrunner/dp0.1/object-one.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/object-one.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/object-one.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/object-photometry.sql b/src/mobu/data/tapquerysetrunner/dp0.1/object-photometry.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/object-photometry.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/object-photometry.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/object-reference.sql b/src/mobu/data/tapquerysetrunner/dp0.1/object-reference.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/object-reference.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/object-reference.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/object-several.sql b/src/mobu/data/tapquerysetrunner/dp0.1/object-several.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/object-several.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/object-several.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/params.yaml b/src/mobu/data/tapquerysetrunner/dp0.1/params.yaml similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/params.yaml rename to src/mobu/data/tapquerysetrunner/dp0.1/params.yaml diff --git a/src/mobu/data/tapqueryrunner/dp0.1/polygon-object.sql b/src/mobu/data/tapquerysetrunner/dp0.1/polygon-object.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/polygon-object.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/polygon-object.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/polygon-position.sql b/src/mobu/data/tapquerysetrunner/dp0.1/polygon-position.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/polygon-position.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/polygon-position.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/scan-flux.sql b/src/mobu/data/tapquerysetrunner/dp0.1/scan-flux.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/scan-flux.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/scan-flux.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/scan-magnitude.sql b/src/mobu/data/tapquerysetrunner/dp0.1/scan-magnitude.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/scan-magnitude.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/scan-magnitude.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/scan-top.sql b/src/mobu/data/tapquerysetrunner/dp0.1/scan-top.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/scan-top.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/scan-top.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2-long-scans/histogram-forcedsource.sql b/src/mobu/data/tapquerysetrunner/dp0.2-long-scans/histogram-forcedsource.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-long-scans/histogram-forcedsource.sql rename to src/mobu/data/tapquerysetrunner/dp0.2-long-scans/histogram-forcedsource.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2-long-scans/params.yaml b/src/mobu/data/tapquerysetrunner/dp0.2-long-scans/params.yaml similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-long-scans/params.yaml rename to src/mobu/data/tapquerysetrunner/dp0.2-long-scans/params.yaml diff --git a/src/mobu/data/tapqueryrunner/dp0.2-long-scans/scan-forcedsource.sql b/src/mobu/data/tapquerysetrunner/dp0.2-long-scans/scan-forcedsource.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-long-scans/scan-forcedsource.sql rename to src/mobu/data/tapquerysetrunner/dp0.2-long-scans/scan-forcedsource.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2-long-scans/time-series-several.sql b/src/mobu/data/tapquerysetrunner/dp0.2-long-scans/time-series-several.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-long-scans/time-series-several.sql rename to src/mobu/data/tapquerysetrunner/dp0.2-long-scans/time-series-several.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2-med-scans/histogram-object.sql b/src/mobu/data/tapquerysetrunner/dp0.2-med-scans/histogram-object.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-med-scans/histogram-object.sql rename to src/mobu/data/tapquerysetrunner/dp0.2-med-scans/histogram-object.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2-med-scans/object-several.sql b/src/mobu/data/tapquerysetrunner/dp0.2-med-scans/object-several.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-med-scans/object-several.sql rename to src/mobu/data/tapquerysetrunner/dp0.2-med-scans/object-several.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2-med-scans/params.yaml b/src/mobu/data/tapquerysetrunner/dp0.2-med-scans/params.yaml similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-med-scans/params.yaml rename to src/mobu/data/tapquerysetrunner/dp0.2-med-scans/params.yaml diff --git a/src/mobu/data/tapqueryrunner/dp0.2-med-scans/scan-object-top.sql b/src/mobu/data/tapquerysetrunner/dp0.2-med-scans/scan-object-top.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-med-scans/scan-object-top.sql rename to src/mobu/data/tapquerysetrunner/dp0.2-med-scans/scan-object-top.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2-med-scans/scan-object.sql b/src/mobu/data/tapquerysetrunner/dp0.2-med-scans/scan-object.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-med-scans/scan-object.sql rename to src/mobu/data/tapquerysetrunner/dp0.2-med-scans/scan-object.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2/cone-object.sql b/src/mobu/data/tapquerysetrunner/dp0.2/cone-object.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/cone-object.sql rename to src/mobu/data/tapquerysetrunner/dp0.2/cone-object.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2/cone-source.sql b/src/mobu/data/tapquerysetrunner/dp0.2/cone-source.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/cone-source.sql rename to src/mobu/data/tapquerysetrunner/dp0.2/cone-source.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2/near-neighbor.sql b/src/mobu/data/tapquerysetrunner/dp0.2/near-neighbor.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/near-neighbor.sql rename to src/mobu/data/tapquerysetrunner/dp0.2/near-neighbor.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2/object-one.sql b/src/mobu/data/tapquerysetrunner/dp0.2/object-one.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/object-one.sql rename to src/mobu/data/tapquerysetrunner/dp0.2/object-one.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2/params.yaml b/src/mobu/data/tapquerysetrunner/dp0.2/params.yaml similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/params.yaml rename to src/mobu/data/tapquerysetrunner/dp0.2/params.yaml diff --git a/src/mobu/data/tapqueryrunner/dp0.2/polygon-object.sql b/src/mobu/data/tapquerysetrunner/dp0.2/polygon-object.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/polygon-object.sql rename to src/mobu/data/tapquerysetrunner/dp0.2/polygon-object.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2/polygon-source.sql b/src/mobu/data/tapquerysetrunner/dp0.2/polygon-source.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/polygon-source.sql rename to src/mobu/data/tapquerysetrunner/dp0.2/polygon-source.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2/time-series-one.sql b/src/mobu/data/tapquerysetrunner/dp0.2/time-series-one.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/time-series-one.sql rename to src/mobu/data/tapquerysetrunner/dp0.2/time-series-one.sql diff --git a/src/mobu/models/business/tap.py b/src/mobu/models/business/tap.py new file mode 100644 index 00000000..d657d896 --- /dev/null +++ b/src/mobu/models/business/tap.py @@ -0,0 +1,36 @@ +"""Base models for TAP-related monkey business.""" + +from __future__ import annotations + +from pydantic import Field + +from .base import BusinessData, BusinessOptions + +__all__ = [ + "TAPBusinessData", + "TAPBusinessOptions", +] + + +class TAPBusinessOptions(BusinessOptions): + """Options for any business that runs TAP queries.""" + + sync: bool = Field( + True, + title="Whether to run TAP queries as sync or async", + description=( + "By default, queries to TAP are run via the sync endpoint." + " Set this to false to run as an async query." + ), + example=True, + ) + + +class TAPBusinessData(BusinessData): + """Status of a running TAPQueryRunner business.""" + + running_query: str | None = Field( + None, + title="Currently running query", + description="Will not be present if no query is being executed", + ) diff --git a/src/mobu/models/business/tapqueryrunner.py b/src/mobu/models/business/tapqueryrunner.py index 64ba73e1..465d3970 100644 --- a/src/mobu/models/business/tapqueryrunner.py +++ b/src/mobu/models/business/tapqueryrunner.py @@ -6,32 +6,26 @@ from pydantic import Field -from .base import BusinessConfig, BusinessData, BusinessOptions +from .base import BusinessConfig +from .tap import TAPBusinessOptions __all__ = [ "TAPQueryRunnerConfig", - "TAPQueryRunnerData", "TAPQueryRunnerOptions", ] -class TAPQueryRunnerOptions(BusinessOptions): +class TAPQueryRunnerOptions(TAPBusinessOptions): """Options for TAPQueryRunner monkey business.""" - query_set: str = Field( - "dp0.1", - title="Which query template set to use for a TapQueryRunner", - example="dp0.2", - ) - - sync: bool = Field( - True, - title="Whether to run TAP queries as sync or async", - description=( - "By default, queries to TAP are run via the sync endpoint." - " Set this to false to run as an async query." - ), - example=True, + queries: list[str] = Field( + ..., + title="TAP queries", + description="List of queries to be run", + example=[ + "SELECT TOP 10 * FROM TAP_SCHEMA.schemas", + "SELECT TOP 10 * FROM MYDB.MyTable", + ], ) @@ -43,16 +37,5 @@ class TAPQueryRunnerConfig(BusinessConfig): ) options: TAPQueryRunnerOptions = Field( - default_factory=TAPQueryRunnerOptions, - title="Options for the monkey business", - ) - - -class TAPQueryRunnerData(BusinessData): - """Status of a running TAPQueryRunner business.""" - - running_query: str | None = Field( - None, - title="Currently running query", - description="Will not be present if no query is being executed", + ..., title="Options for the monkey business" ) diff --git a/src/mobu/models/business/tapquerysetrunner.py b/src/mobu/models/business/tapquerysetrunner.py new file mode 100644 index 00000000..88b8d174 --- /dev/null +++ b/src/mobu/models/business/tapquerysetrunner.py @@ -0,0 +1,38 @@ +"""Models for the TAPQuerySetRunner monkey business.""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import Field + +from .base import BusinessConfig +from .tap import TAPBusinessOptions + +__all__ = [ + "TAPQuerySetRunnerConfig", + "TAPQuerySetRunnerOptions", +] + + +class TAPQuerySetRunnerOptions(TAPBusinessOptions): + """Options for TAPQueryRunner monkey business.""" + + query_set: str = Field( + "dp0.1", + title="Which query template set to use for a TapQueryRunner", + example="dp0.2", + ) + + +class TAPQuerySetRunnerConfig(BusinessConfig): + """Configuration specialization for TAPQuerySetRunner.""" + + type: Literal["TAPQuerySetRunner"] = Field( + ..., title="Type of business to run" + ) + + options: TAPQuerySetRunnerOptions = Field( + default_factory=TAPQuerySetRunnerOptions, + title="Options for the monkey business", + ) diff --git a/src/mobu/models/flock.py b/src/mobu/models/flock.py index 9364493b..be4b6aee 100644 --- a/src/mobu/models/flock.py +++ b/src/mobu/models/flock.py @@ -9,6 +9,7 @@ from .business.jupyterpythonloop import JupyterPythonLoopConfig from .business.notebookrunner import NotebookRunnerConfig from .business.tapqueryrunner import TAPQueryRunnerConfig +from .business.tapquerysetrunner import TAPQuerySetRunnerConfig from .monkey import MonkeyData from .user import User, UserSpec @@ -49,6 +50,7 @@ class FlockConfig(BaseModel): business: ( TAPQueryRunnerConfig + | TAPQuerySetRunnerConfig | NotebookRunnerConfig | JupyterPythonLoopConfig | EmptyLoopConfig diff --git a/src/mobu/models/monkey.py b/src/mobu/models/monkey.py index 990a7723..5767d7d5 100644 --- a/src/mobu/models/monkey.py +++ b/src/mobu/models/monkey.py @@ -7,7 +7,7 @@ from .business.base import BusinessData from .business.notebookrunner import NotebookRunnerData from .business.nublado import NubladoBusinessData -from .business.tapqueryrunner import TAPQueryRunnerData +from .business.tap import TAPBusinessData from .user import AuthenticatedUser @@ -38,7 +38,7 @@ class MonkeyData(BaseModel): # to avoid the risk that Pydantic plus FastAPI will interpret a class as # its parent class. business: ( - TAPQueryRunnerData + TAPBusinessData | NotebookRunnerData | NubladoBusinessData | BusinessData diff --git a/src/mobu/services/business/tap.py b/src/mobu/services/business/tap.py new file mode 100644 index 00000000..450b94c6 --- /dev/null +++ b/src/mobu/services/business/tap.py @@ -0,0 +1,160 @@ +"""Base class for executing TAP queries.""" + +from __future__ import annotations + +import asyncio +from abc import ABCMeta, abstractmethod +from concurrent.futures import ThreadPoolExecutor +from typing import Generic, TypeVar + +import pyvo +import requests +from httpx import AsyncClient +from structlog.stdlib import BoundLogger + +from ...config import config +from ...exceptions import CodeExecutionError, TAPClientError +from ...models.business.tap import TAPBusinessData, TAPBusinessOptions +from ...models.user import AuthenticatedUser +from .base import Business + +T = TypeVar("T", bound="TAPBusinessOptions") + +__all__ = ["TAPBusiness"] + + +class TAPBusiness(Business, Generic[T], metaclass=ABCMeta): + """Base class for business that executes TAP query. + + This class modifies the core `~mobu.business.base.Business` loop by + providing `startup`, `execute`, and `shutdown` methods that know how to + execute TAP queries. Subclasses must override `get_next_query` to return + the next query they want to execute. + + Parameters + ---------- + options + Configuration options for the business. + user + User with their authentication token to use to run the business. + http_client + Shared HTTP client for general web access. + logger + Logger to use to report the results of business. + """ + + def __init__( + self, + options: T, + user: AuthenticatedUser, + http_client: AsyncClient, + logger: BoundLogger, + ) -> None: + super().__init__(options, user, http_client, logger) + self._running_query: str | None = None + self._client: pyvo.dal.TAPService | None = None + self._pool = ThreadPoolExecutor(max_workers=1) + + async def startup(self) -> None: + with self.timings.start("make_client"): + self._client = self._make_client(self.user.token) + + @abstractmethod + def get_next_query(self) -> str: + """Get the next TAP query to run. + + Returns + ------- + str + TAP query as a string. + """ + + async def execute(self) -> None: + query = self.get_next_query() + with self.timings.start("execute_query", {"query": query}) as sw: + self._running_query = query + + try: + if self.options.sync: + await self.run_sync_query(query) + else: + await self.run_async_query(query) + except Exception as e: + raise CodeExecutionError( + user=self.user.username, + code=query, + code_type="TAP query", + error=f"{type(e).__name__}: {e!s}", + ) from e + + self._running_query = None + elapsed = sw.elapsed.total_seconds() + + self.logger.info(f"Query finished after {elapsed} seconds") + + async def run_async_query(self, query: str) -> None: + """Run the query asynchronously. + + Parameters + ---------- + query + Query string to execute. + """ + if not self._client: + raise RuntimeError("TAPBusiness startup never ran") + self.logger.info(f"Running (async): {query}") + job = self._client.submit_job(query) + try: + job.run() + while job.phase not in ("COMPLETED", "ERROR"): + await asyncio.sleep(30) + finally: + job.delete() + + async def run_sync_query(self, query: str) -> None: + """Run the query synchronously. + + Parameters + ---------- + query + Query string to execute. + """ + if not self._client: + raise RuntimeError("TAPBusiness startup never ran") + self.logger.info(f"Running (sync): {query}") + loop = asyncio.get_event_loop() + await loop.run_in_executor(self._pool, self._client.search, query) + + def dump(self) -> TAPBusinessData: + return TAPBusinessData( + running_query=self._running_query, **super().dump().dict() + ) + + def _make_client(self, token: str) -> pyvo.dal.TAPService: + """Create a TAP client. + + Parameters + ---------- + token + User authentication token. + + Returns + ------- + pyvo.dal.TAPService + TAP client object. + """ + if not config.environment_url: + raise RuntimeError("environment_url not set") + tap_url = str(config.environment_url).rstrip("/") + "/api/tap" + try: + s = requests.Session() + s.headers["Authorization"] = "Bearer " + token + auth = pyvo.auth.AuthSession() + auth.credentials.set("lsst-token", s) + auth.add_security_method_for_url(tap_url, "lsst-token") + auth.add_security_method_for_url(tap_url + "/sync", "lsst-token") + auth.add_security_method_for_url(tap_url + "/async", "lsst-token") + auth.add_security_method_for_url(tap_url + "/tables", "lsst-token") + return pyvo.dal.TAPService(tap_url, auth) + except Exception as e: + raise TAPClientError(e, user=self.user.username) from e diff --git a/src/mobu/services/business/tapqueryrunner.py b/src/mobu/services/business/tapqueryrunner.py index e22e0a8f..f2c8b4c8 100644 --- a/src/mobu/services/business/tapqueryrunner.py +++ b/src/mobu/services/business/tapqueryrunner.py @@ -2,31 +2,19 @@ from __future__ import annotations -import asyncio -import importlib.resources -import math -from concurrent.futures import ThreadPoolExecutor from random import SystemRandom -import jinja2 -import pyvo -import requests -import shortuuid -import yaml from httpx import AsyncClient from structlog.stdlib import BoundLogger -from ...config import config -from ...exceptions import CodeExecutionError, TAPClientError -from ...models.business.tapqueryrunner import ( - TAPQueryRunnerData, - TAPQueryRunnerOptions, -) +from ...models.business.tapqueryrunner import TAPQueryRunnerOptions from ...models.user import AuthenticatedUser -from .base import Business +from .tap import TAPBusiness +__all__ = ["TAPQueryRunner"] -class TAPQueryRunner(Business): + +class TAPQueryRunner(TAPBusiness): """Run queries against TAP. Parameters @@ -49,158 +37,7 @@ def __init__( logger: BoundLogger, ) -> None: super().__init__(options, user, http_client, logger) - self._running_query: str | None = None - self._client: pyvo.dal.TAPService | None = None self._random = SystemRandom() - self._pool = ThreadPoolExecutor(max_workers=1) - - # Load templates and parameters. The path has to be specified in two - # different ways: as a relative path for Jinja's PackageLoader, and as - # a sequence of joinpath operations for importlib.resources. - template_path = ("data", "tapqueryrunner", options.query_set) - self._env = jinja2.Environment( - loader=jinja2.PackageLoader("mobu", "/".join(template_path)), - undefined=jinja2.StrictUndefined, - autoescape=jinja2.select_autoescape(disabled_extensions=["sql"]), - ) - files = importlib.resources.files("mobu") - for directory in template_path: - files = files.joinpath(directory) - with files.joinpath("params.yaml").open("r") as f: - self._params = yaml.safe_load(f) - - async def startup(self) -> None: - templates = self._env.list_templates(["sql"]) - self.logger.info("Query templates to choose from: %s", templates) - with self.timings.start("make_client"): - self._client = self._make_client(self.user.token) - - async def execute(self) -> None: - template_name = self._random.choice(self._env.list_templates(["sql"])) - template = self._env.get_template(template_name) - query = template.render(self._generate_parameters()) - - with self.timings.start("execute_query", {"query": query}) as sw: - self._running_query = query - - try: - if self.options.sync: - await self.run_sync_query(query) - else: - await self.run_async_query(query) - except Exception as e: - raise CodeExecutionError( - user=self.user.username, - code=query, - code_type="TAP query", - error=f"{type(e).__name__}: {e!s}", - ) from e - - self._running_query = None - elapsed = sw.elapsed.total_seconds() - - self.logger.info(f"Query finished after {elapsed} seconds") - - async def run_async_query(self, query: str) -> None: - if not self._client: - raise RuntimeError("TAPQueryRunner startup never ran") - self.logger.info("Running (async): %s", query) - job = self._client.submit_job(query) - try: - job.run() - while job.phase not in ("COMPLETED", "ERROR"): - await asyncio.sleep(30) - finally: - job.delete() - - async def run_sync_query(self, query: str) -> None: - if not self._client: - raise RuntimeError("TAPQueryRunner startup never ran") - self.logger.info("Running (sync): %s", query) - loop = asyncio.get_event_loop() - await loop.run_in_executor(self._pool, self._client.search, query) - - def dump(self) -> TAPQueryRunnerData: - return TAPQueryRunnerData( - running_query=self._running_query, **super().dump().dict() - ) - - def _make_client(self, token: str) -> pyvo.dal.TAPService: - if not config.environment_url: - raise RuntimeError("environment_url not set") - tap_url = str(config.environment_url).rstrip("/") + "/api/tap" - try: - s = requests.Session() - s.headers["Authorization"] = "Bearer " + token - auth = pyvo.auth.AuthSession() - auth.credentials.set("lsst-token", s) - auth.add_security_method_for_url(tap_url, "lsst-token") - auth.add_security_method_for_url(tap_url + "/sync", "lsst-token") - auth.add_security_method_for_url(tap_url + "/async", "lsst-token") - auth.add_security_method_for_url(tap_url + "/tables", "lsst-token") - return pyvo.dal.TAPService(tap_url, auth) - except Exception as e: - raise TAPClientError(e, user=self.user.username) from e - - def _generate_random_polygon( - self, - *, - min_ra: float, - max_ra: float, - min_dec: float, - max_dec: float, - min_radius: float, - radius_range: float, - ) -> str: - """Generate a random polygon as comma-separated ra/dec values.""" - ra = min_ra + self._random.random() * (max_ra - min_ra) - dec = min_dec + self._random.random() * (max_dec - min_dec) - r = min_radius + self._random.random() * radius_range - n = self._random.randrange(3, 8) - phi = self._random.random() * 2 * math.pi - poly = [] - for theta in [phi + i * 2 * math.pi / n for i in range(n)]: - poly.append(ra + r * math.sin(theta)) - poly.append(dec + r * math.cos(theta)) - return ", ".join([str(x) for x in poly]) - def _generate_parameters(self) -> dict[str, int | float | str]: - """Generate some random parameters for the query.""" - min_ra = self._params.get("min_ra", 55.0) - max_ra = self._params.get("max_ra", 70.0) - min_dec = self._params.get("min_dec", -42.0) - max_dec = self._params.get("max_dec", -30.0) - min_radius = self._params.get("min_radius", 0.01) - radius_range = self._params.get("radius_range", 0.04) - radius_near_range = self._params.get("radius_near_range", 0.09) - min_flux = 0.0 + self._random.random() * 0.00100 - min_mag = 15.0 + self._random.random() * 15.0 - result = { - "ra": min_ra + self._random.random() * (max_ra - min_ra), - "dec": min_dec + self._random.random() * (max_dec - min_dec), - "min_flux": min_flux, - "max_flux": min_flux + 0.00001, - "min_mag": min_mag, - "max_mag": min_mag + 0.1, - "polygon": self._generate_random_polygon( - min_ra=min_ra, - max_ra=max_ra, - min_dec=min_dec, - max_dec=max_dec, - min_radius=min_radius, - radius_range=radius_range, - ), - "radius": min_radius + self._random.random() * radius_range, - "radius_near": ( - min_radius + self._random.random() * radius_near_range - ), - "username": self.user.username, - "query_id": "mobu-" + shortuuid.uuid(), - } - object_ids = self._params.get("object_ids") - if object_ids: - result["object"] = str(self._random.choice(object_ids)) - result["objects"] = ", ".join( - str(o) for o in self._random.choices(object_ids, k=12) - ) - return result + def get_next_query(self) -> str: + return self._random.choice(self.options.queries) diff --git a/src/mobu/services/business/tapquerysetrunner.py b/src/mobu/services/business/tapquerysetrunner.py new file mode 100644 index 00000000..3f997cc4 --- /dev/null +++ b/src/mobu/services/business/tapquerysetrunner.py @@ -0,0 +1,140 @@ +"""Run a set of predefined queries against a TAP service.""" + +from __future__ import annotations + +import importlib.resources +import math +from random import SystemRandom + +import jinja2 +import shortuuid +import yaml +from httpx import AsyncClient +from structlog.stdlib import BoundLogger + +from ...models.business.tapquerysetrunner import TAPQuerySetRunnerOptions +from ...models.user import AuthenticatedUser +from .tap import TAPBusiness + +__all__ = ["TAPQuerySetRunner"] + + +class TAPQuerySetRunner(TAPBusiness): + """Run queries from a predefined set against TAP with random parameters. + + Parameters + ---------- + options + Configuration options for the business. + user + User with their authentication token to use to run the business. + http_client + Shared HTTP client for general web access. + logger + Logger to use to report the results of business. + """ + + def __init__( + self, + options: TAPQuerySetRunnerOptions, + user: AuthenticatedUser, + http_client: AsyncClient, + logger: BoundLogger, + ) -> None: + super().__init__(options, user, http_client, logger) + self._random = SystemRandom() + + # Load templates and parameters. The path has to be specified in two + # different ways: as a relative path for Jinja's PackageLoader, and as + # a sequence of joinpath operations for importlib.resources. + template_path = ("data", "tapquerysetrunner", self.options.query_set) + self._env = jinja2.Environment( + loader=jinja2.PackageLoader("mobu", "/".join(template_path)), + undefined=jinja2.StrictUndefined, + autoescape=jinja2.select_autoescape(disabled_extensions=["sql"]), + ) + files = importlib.resources.files("mobu") + for directory in template_path: + files = files.joinpath(directory) + with files.joinpath("params.yaml").open("r") as f: + self._params = yaml.safe_load(f) + + async def startup(self) -> None: + await super().startup() + templates = self._env.list_templates(["sql"]) + self.logger.info("Query templates to choose from: %s", templates) + + def get_next_query(self) -> str: + """Choose a random query from the query set. + + Returns + ------- + str + Next TAP query to run. + """ + template_name = self._random.choice(self._env.list_templates(["sql"])) + template = self._env.get_template(template_name) + return template.render(self._generate_parameters()) + + def _generate_random_polygon( + self, + *, + min_ra: float, + max_ra: float, + min_dec: float, + max_dec: float, + min_radius: float, + radius_range: float, + ) -> str: + """Generate a random polygon as comma-separated ra/dec values.""" + ra = min_ra + self._random.random() * (max_ra - min_ra) + dec = min_dec + self._random.random() * (max_dec - min_dec) + r = min_radius + self._random.random() * radius_range + n = self._random.randrange(3, 8) + phi = self._random.random() * 2 * math.pi + poly = [] + for theta in [phi + i * 2 * math.pi / n for i in range(n)]: + poly.append(ra + r * math.sin(theta)) + poly.append(dec + r * math.cos(theta)) + return ", ".join([str(x) for x in poly]) + + def _generate_parameters(self) -> dict[str, int | float | str]: + """Generate some random parameters for the query.""" + min_ra = self._params.get("min_ra", 55.0) + max_ra = self._params.get("max_ra", 70.0) + min_dec = self._params.get("min_dec", -42.0) + max_dec = self._params.get("max_dec", -30.0) + min_radius = self._params.get("min_radius", 0.01) + radius_range = self._params.get("radius_range", 0.04) + radius_near_range = self._params.get("radius_near_range", 0.09) + min_flux = 0.0 + self._random.random() * 0.00100 + min_mag = 15.0 + self._random.random() * 15.0 + result = { + "ra": min_ra + self._random.random() * (max_ra - min_ra), + "dec": min_dec + self._random.random() * (max_dec - min_dec), + "min_flux": min_flux, + "max_flux": min_flux + 0.00001, + "min_mag": min_mag, + "max_mag": min_mag + 0.1, + "polygon": self._generate_random_polygon( + min_ra=min_ra, + max_ra=max_ra, + min_dec=min_dec, + max_dec=max_dec, + min_radius=min_radius, + radius_range=radius_range, + ), + "radius": min_radius + self._random.random() * radius_range, + "radius_near": ( + min_radius + self._random.random() * radius_near_range + ), + "username": self.user.username, + "query_id": "mobu-" + shortuuid.uuid(), + } + object_ids = self._params.get("object_ids") + if object_ids: + result["object"] = str(self._random.choice(object_ids)) + result["objects"] = ", ".join( + str(o) for o in self._random.choices(object_ids, k=12) + ) + return result diff --git a/src/mobu/services/monkey.py b/src/mobu/services/monkey.py index 2d1ae58a..144b4e39 100644 --- a/src/mobu/services/monkey.py +++ b/src/mobu/services/monkey.py @@ -22,6 +22,7 @@ from ..models.business.jupyterpythonloop import JupyterPythonLoopConfig from ..models.business.notebookrunner import NotebookRunnerConfig from ..models.business.tapqueryrunner import TAPQueryRunnerConfig +from ..models.business.tapquerysetrunner import TAPQuerySetRunnerConfig from ..models.monkey import MonkeyData, MonkeyState from ..models.user import AuthenticatedUser from .business.base import Business @@ -29,6 +30,7 @@ from .business.jupyterpythonloop import JupyterPythonLoop from .business.notebookrunner import NotebookRunner from .business.tapqueryrunner import TAPQueryRunner +from .business.tapquerysetrunner import TAPQuerySetRunner __all__ = ["Monkey"] @@ -99,6 +101,10 @@ def __init__( self.business = TAPQueryRunner( business_config.options, user, self._http_client, self._logger ) + elif isinstance(business_config, TAPQuerySetRunnerConfig): + self.business = TAPQuerySetRunner( + business_config.options, user, self._http_client, self._logger + ) else: msg = f"Unknown business config {business_config}" raise TypeError(msg) diff --git a/tests/business/tapqueryrunner_test.py b/tests/business/tapqueryrunner_test.py index 4e36f6b7..ed14da42 100644 --- a/tests/business/tapqueryrunner_test.py +++ b/tests/business/tapqueryrunner_test.py @@ -2,23 +2,12 @@ from __future__ import annotations -from pathlib import Path -from typing import cast from unittest.mock import ANY, patch import pytest import pyvo import respx -import structlog -import yaml from httpx import AsyncClient -from safir.dependencies.http_client import http_client_dependency -from safir.testing.slack import MockSlackWebhook - -import mobu -from mobu.models.business.tapqueryrunner import TAPQueryRunnerOptions -from mobu.models.user import AuthenticatedUser -from mobu.services.business.tapqueryrunner import TAPQueryRunner from ..support.gafaelfawr import mock_gafaelfawr from ..support.util import wait_for_business @@ -27,6 +16,10 @@ @pytest.mark.asyncio async def test_run(client: AsyncClient, respx_mock: respx.Router) -> None: mock_gafaelfawr(respx_mock) + queries = [ + "SELECT TOP 10 * FROM TAP_SCHEMA.tables", + "SELECT TOP 10 * FROM TAP_SCHEMA.columns", + ] with patch.object(pyvo.dal, "TAPService"): r = await client.put( @@ -36,7 +29,10 @@ async def test_run(client: AsyncClient, respx_mock: respx.Router) -> None: "count": 1, "user_spec": {"username_prefix": "testuser"}, "scopes": ["exec:notebook"], - "business": {"type": "TAPQueryRunner"}, + "business": { + "type": "TAPQueryRunner", + "options": {"queries": queries}, + }, }, ) assert r.status_code == 201 @@ -63,203 +59,9 @@ async def test_run(client: AsyncClient, respx_mock: respx.Router) -> None: r = await client.get("/mobu/flocks/test/monkeys/testuser1/log") assert r.status_code == 200 assert "Running (sync): " in r.text + found = False + for query in queries: + if query in r.text: + found = True + assert found, "Ran one of the appropriate queries" assert "Query finished after " in r.text - - -@pytest.mark.asyncio -async def test_setup_error( - client: AsyncClient, - slack: MockSlackWebhook, - respx_mock: respx.Router, -) -> None: - """Test that client creation is deferred to setup. - - This also doubles as a test that failures during setup are recorded as a - failed test execution and result in a Slack alert. - """ - mock_gafaelfawr(respx_mock) - - r = await client.put( - "/mobu/flocks", - json={ - "name": "test", - "count": 1, - "users": [{"username": "tapuser"}], - "scopes": ["exec:notebook"], - "business": {"type": "TAPQueryRunner"}, - }, - ) - assert r.status_code == 201 - - # Wait until we've finished at least one loop and check the results. - data = await wait_for_business(client, "tapuser") - assert data["business"]["failure_count"] == 1 - - assert slack.messages == [ - { - "blocks": [ - { - "type": "section", - "text": { - "type": "mrkdwn", - "text": ( - "Unable to create TAP client: DALServiceError:" - " No working capabilities endpoint provided" - ), - "verbatim": True, - }, - }, - { - "type": "section", - "fields": [ - {"type": "mrkdwn", "text": ANY, "verbatim": True}, - {"type": "mrkdwn", "text": ANY, "verbatim": True}, - { - "type": "mrkdwn", - "text": "*Exception type*\nTAPClientError", - "verbatim": True, - }, - { - "type": "mrkdwn", - "text": "*Monkey*\ntest/tapuser", - "verbatim": True, - }, - { - "type": "mrkdwn", - "text": "*User*\ntapuser", - "verbatim": True, - }, - { - "type": "mrkdwn", - "text": "*Event*\nmake_client", - "verbatim": True, - }, - ], - }, - {"type": "divider"}, - ] - } - ] - - -@pytest.mark.asyncio -async def test_alert( - client: AsyncClient, slack: MockSlackWebhook, respx_mock: respx.Router -) -> None: - mock_gafaelfawr(respx_mock) - - with patch.object(pyvo.dal, "TAPService") as mock: - mock.return_value.search.side_effect = [Exception("some error")] - - r = await client.put( - "/mobu/flocks", - json={ - "name": "test", - "count": 1, - "user_spec": {"username_prefix": "testuser"}, - "scopes": ["exec:notebook"], - "business": {"type": "TAPQueryRunner"}, - }, - ) - assert r.status_code == 201 - - # Wait until we've finished at least one loop and check the results. - data = await wait_for_business(client, "testuser1") - assert data["business"]["failure_count"] == 1 - - assert slack.messages == [ - { - "blocks": [ - { - "type": "section", - "text": { - "type": "mrkdwn", - "text": "Error while running TAP query", - "verbatim": True, - }, - }, - { - "type": "section", - "fields": [ - {"type": "mrkdwn", "text": ANY, "verbatim": True}, - {"type": "mrkdwn", "text": ANY, "verbatim": True}, - { - "type": "mrkdwn", - "text": "*Exception type*\nCodeExecutionError", - "verbatim": True, - }, - { - "type": "mrkdwn", - "text": "*Monkey*\ntest/testuser1", - "verbatim": True, - }, - { - "type": "mrkdwn", - "text": "*User*\ntestuser1", - "verbatim": True, - }, - { - "type": "mrkdwn", - "text": "*Event*\nexecute_query", - "verbatim": True, - }, - ], - }, - ], - "attachments": [ - { - "blocks": [ - { - "type": "section", - "text": { - "type": "mrkdwn", - "text": ( - "*Error*\n" - "```\nException: some error\n```" - ), - "verbatim": True, - }, - }, - { - "type": "section", - "text": { - "type": "mrkdwn", - "text": ANY, - "verbatim": True, - }, - }, - ] - } - ], - } - ] - - -@pytest.mark.asyncio -async def test_random_object() -> None: - for query_set in ("dp0.1", "dp0.2"): - params_path = ( - Path(mobu.__file__).parent - / "data" - / "tapqueryrunner" - / query_set - / "params.yaml" - ) - with params_path.open("r") as f: - objects = [str(o) for o in yaml.safe_load(f)["object_ids"]] - - user = AuthenticatedUser( - username="user", scopes=["read:tap"], token="blah blah" - ) - logger = structlog.get_logger(__file__) - options = TAPQueryRunnerOptions(query_set=query_set) - http_client = await http_client_dependency() - with patch.object(pyvo.dal, "TAPService"): - runner = TAPQueryRunner(options, user, http_client, logger) - parameters = runner._generate_parameters() - - assert parameters["object"] in objects - random_objects = cast(str, parameters["objects"]).split(", ") - assert len(random_objects) == 12 - for obj in random_objects: - assert obj in objects diff --git a/tests/business/tapquerysetrunner_test.py b/tests/business/tapquerysetrunner_test.py new file mode 100644 index 00000000..3629ab0e --- /dev/null +++ b/tests/business/tapquerysetrunner_test.py @@ -0,0 +1,264 @@ +"""Tests for TAPQuerySetRunner.""" + +from __future__ import annotations + +from pathlib import Path +from typing import cast +from unittest.mock import ANY, patch + +import pytest +import pyvo +import respx +import structlog +import yaml +from httpx import AsyncClient +from safir.dependencies.http_client import http_client_dependency +from safir.testing.slack import MockSlackWebhook + +import mobu +from mobu.models.business.tapquerysetrunner import TAPQuerySetRunnerOptions +from mobu.models.user import AuthenticatedUser +from mobu.services.business.tapquerysetrunner import TAPQuerySetRunner + +from ..support.gafaelfawr import mock_gafaelfawr +from ..support.util import wait_for_business + + +@pytest.mark.asyncio +async def test_run(client: AsyncClient, respx_mock: respx.Router) -> None: + mock_gafaelfawr(respx_mock) + + with patch.object(pyvo.dal, "TAPService"): + r = await client.put( + "/mobu/flocks", + json={ + "name": "test", + "count": 1, + "user_spec": {"username_prefix": "testuser"}, + "scopes": ["exec:notebook"], + "business": {"type": "TAPQuerySetRunner"}, + }, + ) + assert r.status_code == 201 + + # Wait until we've finished at least one loop and check the results. + data = await wait_for_business(client, "testuser1") + assert data == { + "name": "testuser1", + "business": { + "failure_count": 0, + "name": "TAPQuerySetRunner", + "success_count": 1, + "timings": ANY, + }, + "state": "RUNNING", + "user": { + "scopes": ["exec:notebook"], + "token": ANY, + "username": "testuser1", + }, + } + + # Get the log and check that we logged the query. + r = await client.get("/mobu/flocks/test/monkeys/testuser1/log") + assert r.status_code == 200 + assert "Running (sync): " in r.text + assert "Query finished after " in r.text + + +@pytest.mark.asyncio +async def test_setup_error( + client: AsyncClient, + slack: MockSlackWebhook, + respx_mock: respx.Router, +) -> None: + """Test that client creation is deferred to setup. + + This also doubles as a test that failures during setup are recorded as a + failed test execution and result in a Slack alert. + """ + mock_gafaelfawr(respx_mock) + + r = await client.put( + "/mobu/flocks", + json={ + "name": "test", + "count": 1, + "users": [{"username": "tapuser"}], + "scopes": ["exec:notebook"], + "business": {"type": "TAPQuerySetRunner"}, + }, + ) + assert r.status_code == 201 + + # Wait until we've finished at least one loop and check the results. + data = await wait_for_business(client, "tapuser") + assert data["business"]["failure_count"] == 1 + + assert slack.messages == [ + { + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": ( + "Unable to create TAP client: DALServiceError:" + " No working capabilities endpoint provided" + ), + "verbatim": True, + }, + }, + { + "type": "section", + "fields": [ + {"type": "mrkdwn", "text": ANY, "verbatim": True}, + {"type": "mrkdwn", "text": ANY, "verbatim": True}, + { + "type": "mrkdwn", + "text": "*Exception type*\nTAPClientError", + "verbatim": True, + }, + { + "type": "mrkdwn", + "text": "*Monkey*\ntest/tapuser", + "verbatim": True, + }, + { + "type": "mrkdwn", + "text": "*User*\ntapuser", + "verbatim": True, + }, + { + "type": "mrkdwn", + "text": "*Event*\nmake_client", + "verbatim": True, + }, + ], + }, + {"type": "divider"}, + ] + } + ] + + +@pytest.mark.asyncio +async def test_alert( + client: AsyncClient, slack: MockSlackWebhook, respx_mock: respx.Router +) -> None: + mock_gafaelfawr(respx_mock) + + with patch.object(pyvo.dal, "TAPService") as mock: + mock.return_value.search.side_effect = [Exception("some error")] + + r = await client.put( + "/mobu/flocks", + json={ + "name": "test", + "count": 1, + "user_spec": {"username_prefix": "testuser"}, + "scopes": ["exec:notebook"], + "business": {"type": "TAPQuerySetRunner"}, + }, + ) + assert r.status_code == 201 + + # Wait until we've finished at least one loop and check the results. + data = await wait_for_business(client, "testuser1") + assert data["business"]["failure_count"] == 1 + + assert slack.messages == [ + { + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "Error while running TAP query", + "verbatim": True, + }, + }, + { + "type": "section", + "fields": [ + {"type": "mrkdwn", "text": ANY, "verbatim": True}, + {"type": "mrkdwn", "text": ANY, "verbatim": True}, + { + "type": "mrkdwn", + "text": "*Exception type*\nCodeExecutionError", + "verbatim": True, + }, + { + "type": "mrkdwn", + "text": "*Monkey*\ntest/testuser1", + "verbatim": True, + }, + { + "type": "mrkdwn", + "text": "*User*\ntestuser1", + "verbatim": True, + }, + { + "type": "mrkdwn", + "text": "*Event*\nexecute_query", + "verbatim": True, + }, + ], + }, + ], + "attachments": [ + { + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": ( + "*Error*\n```\nException: some error\n```" + ), + "verbatim": True, + }, + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": ANY, + "verbatim": True, + }, + }, + ] + } + ], + } + ] + + +@pytest.mark.asyncio +async def test_random_object() -> None: + for query_set in ("dp0.1", "dp0.2"): + params_path = ( + Path(mobu.__file__).parent + / "data" + / "tapquerysetrunner" + / query_set + / "params.yaml" + ) + with params_path.open("r") as f: + objects = [str(o) for o in yaml.safe_load(f)["object_ids"]] + + user = AuthenticatedUser( + username="user", scopes=["read:tap"], token="blah blah" + ) + logger = structlog.get_logger(__file__) + options = TAPQuerySetRunnerOptions(query_set=query_set) + http_client = await http_client_dependency() + with patch.object(pyvo.dal, "TAPService"): + runner = TAPQuerySetRunner(options, user, http_client, logger) + parameters = runner._generate_parameters() + + assert parameters["object"] in objects + random_objects = cast(str, parameters["objects"]).split(", ") + assert len(random_objects) == 12 + for obj in random_objects: + assert obj in objects