From 70b4760b49970dd67224105660b0b5f1ac3641ed Mon Sep 17 00:00:00 2001 From: Stelios Voutsinas Date: Thu, 28 Sep 2023 16:52:10 +0300 Subject: [PATCH 1/2] Add option to use query lists from options in TAPQueryRunner --- src/mobu/models/business/tapqueryrunner.py | 11 +- src/mobu/services/business/tapqueryrunner.py | 203 +++++++++++++++++-- tests/business/tapqueryrunner_test.py | 21 +- 3 files changed, 211 insertions(+), 24 deletions(-) diff --git a/src/mobu/models/business/tapqueryrunner.py b/src/mobu/models/business/tapqueryrunner.py index 64ba73e1..dd0091d4 100644 --- a/src/mobu/models/business/tapqueryrunner.py +++ b/src/mobu/models/business/tapqueryrunner.py @@ -34,13 +34,18 @@ class TAPQueryRunnerOptions(BusinessOptions): example=True, ) + queries: list | None = Field( + None, + title="Which query list to use for the TapQueryRunner", + description="List of queries to be run instead of a query_set", + example=True, + ) + class TAPQueryRunnerConfig(BusinessConfig): """Configuration specialization for TAPQueryRunner.""" - type: Literal["TAPQueryRunner"] = Field( - ..., title="Type of business to run" - ) + type: Literal["TAPQueryRunner"] = Field(..., title="Type of business to run") options: TAPQueryRunnerOptions = Field( default_factory=TAPQueryRunnerOptions, diff --git a/src/mobu/services/business/tapqueryrunner.py b/src/mobu/services/business/tapqueryrunner.py index e22e0a8f..95cfd016 100644 --- a/src/mobu/services/business/tapqueryrunner.py +++ b/src/mobu/services/business/tapqueryrunner.py @@ -5,9 +5,10 @@ import asyncio import importlib.resources import math +from typing import Any, Protocol, Union, runtime_checkable from concurrent.futures import ThreadPoolExecutor from random import SystemRandom - +from enum import Enum import jinja2 import pyvo import requests @@ -26,6 +27,95 @@ from .base import Business +@runtime_checkable +class TAPQueryContext(Protocol): + """Query Context Protocol + Defines the methods that should be implemented for various query context implementations. + Query context: Where/how the collection of queries to be run is generated from. + """ + + def __init__(self, **_arg: Any) -> None: + ... + + class ContextTypes(Enum): + """Define different types of query contexts""" + + QUERY_LIST = "QUERY_LIST" # List of queries passed in as options + TEMPLATES = "TEMPLATES" # Templates from local filepath + + @property + def context_type(self) -> TAPQueryContext.ContextTypes: + """Get the context type""" + ... + + def get_next_query(self) -> str: + """Get the next query""" + ... + + +class TAPQueryContextTemplates: + """Context is template based here, i.e. the queries are read from local filepath as + Jinja templates. + """ + + def __init__(self, taprunner: TAPQueryRunner) -> None: + self.taprunner = taprunner + + @property + def context_type(self) -> TAPQueryContext.ContextTypes: + """Get Context Type + + Returns: + TAPQueryContext.ContextTypes: The context type + """ + return TAPQueryContext.ContextTypes.TEMPLATES + + def get_next_query(self) -> str: + """Get a query from the query_set randomly, using the random_engine of the TAP Runner + Render query from template, using generated parameters + + Returns: + str: The next query string + """ + template_name = self.taprunner.random_engine.choice( + self.taprunner.env.list_templates(["sql"]) + ) + template = self.taprunner.env.get_template(template_name) + query = template.render(self.taprunner.generated_params) + return query + + +class TAPQueryContextQueryList: + """Context for generating queries from a given list of query strings.""" + + def __init__(self, taprunner: TAPQueryRunner) -> None: + self.taprunner = taprunner + + @property + def context_type(self) -> TAPQueryContext.ContextTypes: + """Get Context Type + + Returns: + TAPQueryContext.ContextTypes: The context type + """ + return TAPQueryContext.ContextTypes.QUERY_LIST + + def get_next_query(self) -> str: + """Get a query from the list randomly, using the random_engine of the TAP Runner + + Returns: + str: The next query string + """ + return self.taprunner.random_engine.choice(self.taprunner.queries) + + +# Mapping of context types to TAPQueryContext class type +TAP_QUERY_CONTEXTS = { + TAPQueryContext.ContextTypes.QUERY_LIST: TAPQueryContextQueryList, + TAPQueryContext.ContextTypes.TEMPLATES: TAPQueryContextTemplates, +} + + class TAPQueryRunner(Business): """Run queries against TAP. @@ -53,33 +143,104 @@ def __init__( self._client: pyvo.dal.TAPService | None = None self._random = SystemRandom() self._pool = ThreadPoolExecutor(max_workers=1) + self._context = self._get_context(options) + self._env = self._get_environment() + self._params = self._get_params() + self._queries = options.queries + + async def startup(self) -> None: + if self._context.context_type is TAPQueryContext.ContextTypes.TEMPLATES: + templates = self._env.list_templates(["sql"]) + self.logger.info("Query templates to choose from: %s", templates) + with self.timings.start("make_client"): + self._client = self._make_client(self.user.token) + + def get_next_query(self) -> str: + """Get the next query string from the context + + Returns: + str: The next query string + """ + return self._context.get_next_query() + + @property + def queries(self): + return self._queries + + @property + def env(self): + return self._env + + @property + def random_engine(self): + return self._random + + @property + def params(self): + return self._params + + @property + def generated_params(self): + return self._generate_parameters() + + def _get_context( + self, + options, + ) -> Union[TAPQueryContextQueryList, TAPQueryContextTemplates]: + """Get the context for this TAP query runner + + Parameters: + options (TAPQueryRunnerOptions): The runner options based on which to get the context + """ + if options.queries: + return TAP_QUERY_CONTEXTS[TAPQueryContext.ContextTypes.QUERY_LIST]( + taprunner=self + ) + return TAP_QUERY_CONTEXTS[TAPQueryContext.ContextTypes.TEMPLATES]( + taprunner=self + ) + + def _get_environment(self) -> Union[jinja2.Environment, None]: + """Get the jinha2 template if applicable else return None + + Returns: + Union[jinja2.Environment, None]: Return the jinja2 Environment, or None + """ + if self._context.context_type is not TAPQueryContext.ContextTypes.TEMPLATES: + return None # Load templates and parameters. The path has to be specified in two # different ways: as a relative path for Jinja's PackageLoader, and as # a sequence of joinpath operations for importlib.resources. - template_path = ("data", "tapqueryrunner", options.query_set) - self._env = jinja2.Environment( + template_path = ("data", "tapqueryrunner", self.options.query_set) + env = jinja2.Environment( loader=jinja2.PackageLoader("mobu", "/".join(template_path)), undefined=jinja2.StrictUndefined, autoescape=jinja2.select_autoescape(disabled_extensions=["sql"]), ) + return env + + def _get_params(self) -> Union[dict, None]: + """Get the parameters as a dictionary if applicable else return None + + Returns: + Union[dict, None]: Return the parameters as a dict, or None + """ + if self._context.context_type is not TAPQueryContext.ContextTypes.TEMPLATES: + return None + template_path = ("data", "tapqueryrunner", self.options.query_set) files = importlib.resources.files("mobu") for directory in template_path: files = files.joinpath(directory) with files.joinpath("params.yaml").open("r") as f: - self._params = yaml.safe_load(f) - - async def startup(self) -> None: - templates = self._env.list_templates(["sql"]) - self.logger.info("Query templates to choose from: %s", templates) - with self.timings.start("make_client"): - self._client = self._make_client(self.user.token) + params = yaml.safe_load(f) + return params async def execute(self) -> None: - template_name = self._random.choice(self._env.list_templates(["sql"])) - template = self._env.get_template(template_name) - query = template.render(self._generate_parameters()) - + """Get and execute the next query from the context, synchronously or asynchronously + depending on options + """ + query = self.get_next_query() with self.timings.start("execute_query", {"query": query}) as sw: self._running_query = query @@ -102,6 +263,11 @@ async def execute(self) -> None: self.logger.info(f"Query finished after {elapsed} seconds") async def run_async_query(self, query: str) -> None: + """Run the query asynchronously + + Parameters: + query (str): The query string to execute + """ if not self._client: raise RuntimeError("TAPQueryRunner startup never ran") self.logger.info("Running (async): %s", query) @@ -114,6 +280,11 @@ async def run_async_query(self, query: str) -> None: job.delete() async def run_sync_query(self, query: str) -> None: + """Run the query synchronously + + Parameters: + query (str): The query string to execute + """ if not self._client: raise RuntimeError("TAPQueryRunner startup never ran") self.logger.info("Running (sync): %s", query) @@ -191,9 +362,7 @@ def _generate_parameters(self) -> dict[str, int | float | str]: radius_range=radius_range, ), "radius": min_radius + self._random.random() * radius_range, - "radius_near": ( - min_radius + self._random.random() * radius_near_range - ), + "radius_near": (min_radius + self._random.random() * radius_near_range), "username": self.user.username, "query_id": "mobu-" + shortuuid.uuid(), } diff --git a/tests/business/tapqueryrunner_test.py b/tests/business/tapqueryrunner_test.py index 4e36f6b7..9643d58a 100644 --- a/tests/business/tapqueryrunner_test.py +++ b/tests/business/tapqueryrunner_test.py @@ -213,10 +213,7 @@ async def test_alert( "type": "section", "text": { "type": "mrkdwn", - "text": ( - "*Error*\n" - "```\nException: some error\n```" - ), + "text": ("*Error*\n" "```\nException: some error\n```"), "verbatim": True, }, }, @@ -263,3 +260,19 @@ async def test_random_object() -> None: assert len(random_objects) == 12 for obj in random_objects: assert obj in objects + + +@pytest.mark.asyncio +async def test_query_list() -> None: + queries = [ + "SELECT TOP 10 * FROM TAP_SCHEMA.tables", + "SELECT TOP 10 * FROM TAP_SCHEMA.columns", + ] + user = AuthenticatedUser(username="user", scopes=["read:tap"], token="blah blah") + logger = structlog.get_logger(__file__) + options = TAPQueryRunnerOptions(queries=queries) + http_client = await http_client_dependency() + with patch.object(pyvo.dal, "TAPService"): + runner = TAPQueryRunner(options, user, http_client, logger) + generated_query = runner.get_next_query() + assert generated_query in queries From 39e0fc52571dfd933ebf4a31958d286a84e345f9 Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Thu, 14 Dec 2023 11:56:13 -0800 Subject: [PATCH 2/2] Refactor support for a list of TAP queries Rather than add a new layer of abstraction to allow two different configurations of TAPQueryRunner, abstract the common functionality of businesses that run TAP queries, rename the old TAPQueryRunner to TAPQuerySetRunner, and add a new TAPQueryRunner that takes a simple list of queries. This follows the existing pattern for keeping each business simple and supporting a variety of tests via separate business classes. --- changelog.d/20231214_124422_rra_DM_42182.md | 3 + .../dp0.1/cone-object.sql | 0 .../dp0.1/cone-photometry.sql | 0 .../dp0.1/cone-position.sql | 0 .../dp0.1/histogram-flux.sql | 0 .../dp0.1/histogram-magnitude.sql | 0 .../dp0.1/neighbor-near.sql | 0 .../dp0.1/object-one.sql | 0 .../dp0.1/object-photometry.sql | 0 .../dp0.1/object-reference.sql | 0 .../dp0.1/object-several.sql | 0 .../dp0.1/params.yaml | 0 .../dp0.1/polygon-object.sql | 0 .../dp0.1/polygon-position.sql | 0 .../dp0.1/scan-flux.sql | 0 .../dp0.1/scan-magnitude.sql | 0 .../dp0.1/scan-top.sql | 0 .../histogram-forcedsource.sql | 0 .../dp0.2-long-scans/params.yaml | 0 .../dp0.2-long-scans/scan-forcedsource.sql | 0 .../dp0.2-long-scans/time-series-several.sql | 0 .../dp0.2-med-scans/histogram-object.sql | 0 .../dp0.2-med-scans/object-several.sql | 0 .../dp0.2-med-scans/params.yaml | 0 .../dp0.2-med-scans/scan-object-top.sql | 0 .../dp0.2-med-scans/scan-object.sql | 0 .../dp0.2/cone-object.sql | 0 .../dp0.2/cone-source.sql | 0 .../dp0.2/near-neighbor.sql | 0 .../dp0.2/object-one.sql | 0 .../dp0.2/params.yaml | 0 .../dp0.2/polygon-object.sql | 0 .../dp0.2/polygon-source.sql | 0 .../dp0.2/time-series-one.sql | 0 src/mobu/models/business/tap.py | 36 ++ src/mobu/models/business/tapqueryrunner.py | 52 +-- src/mobu/models/business/tapquerysetrunner.py | 38 ++ src/mobu/models/flock.py | 2 + src/mobu/models/monkey.py | 4 +- src/mobu/services/business/tap.py | 160 ++++++++ src/mobu/services/business/tapqueryrunner.py | 344 +----------------- .../services/business/tapquerysetrunner.py | 140 +++++++ src/mobu/services/monkey.py | 6 + tests/business/tapqueryrunner_test.py | 237 +----------- tests/business/tapquerysetrunner_test.py | 264 ++++++++++++++ 45 files changed, 685 insertions(+), 601 deletions(-) create mode 100644 changelog.d/20231214_124422_rra_DM_42182.md rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/cone-object.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/cone-photometry.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/cone-position.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/histogram-flux.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/histogram-magnitude.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/neighbor-near.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/object-one.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/object-photometry.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/object-reference.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/object-several.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/params.yaml (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/polygon-object.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/polygon-position.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/scan-flux.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/scan-magnitude.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.1/scan-top.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2-long-scans/histogram-forcedsource.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2-long-scans/params.yaml (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2-long-scans/scan-forcedsource.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2-long-scans/time-series-several.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2-med-scans/histogram-object.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2-med-scans/object-several.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2-med-scans/params.yaml (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2-med-scans/scan-object-top.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2-med-scans/scan-object.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2/cone-object.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2/cone-source.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2/near-neighbor.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2/object-one.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2/params.yaml (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2/polygon-object.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2/polygon-source.sql (100%) rename src/mobu/data/{tapqueryrunner => tapquerysetrunner}/dp0.2/time-series-one.sql (100%) create mode 100644 src/mobu/models/business/tap.py create mode 100644 src/mobu/models/business/tapquerysetrunner.py create mode 100644 src/mobu/services/business/tap.py create mode 100644 src/mobu/services/business/tapquerysetrunner.py create mode 100644 tests/business/tapquerysetrunner_test.py diff --git a/changelog.d/20231214_124422_rra_DM_42182.md b/changelog.d/20231214_124422_rra_DM_42182.md new file mode 100644 index 00000000..29e4ac9d --- /dev/null +++ b/changelog.d/20231214_124422_rra_DM_42182.md @@ -0,0 +1,3 @@ +### Backwards-incompatible changes + +- Rename the existing `TAPQueryRunner` business to `TAPQuerySetRunner` to more accurately capture what it does. Add a new `TAPQueryRunner` business that runs queries chosen randomly from a list. Based on work by @stvoutsin. diff --git a/src/mobu/data/tapqueryrunner/dp0.1/cone-object.sql b/src/mobu/data/tapquerysetrunner/dp0.1/cone-object.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/cone-object.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/cone-object.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/cone-photometry.sql b/src/mobu/data/tapquerysetrunner/dp0.1/cone-photometry.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/cone-photometry.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/cone-photometry.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/cone-position.sql b/src/mobu/data/tapquerysetrunner/dp0.1/cone-position.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/cone-position.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/cone-position.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/histogram-flux.sql b/src/mobu/data/tapquerysetrunner/dp0.1/histogram-flux.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/histogram-flux.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/histogram-flux.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/histogram-magnitude.sql b/src/mobu/data/tapquerysetrunner/dp0.1/histogram-magnitude.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/histogram-magnitude.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/histogram-magnitude.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/neighbor-near.sql b/src/mobu/data/tapquerysetrunner/dp0.1/neighbor-near.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/neighbor-near.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/neighbor-near.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/object-one.sql b/src/mobu/data/tapquerysetrunner/dp0.1/object-one.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/object-one.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/object-one.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/object-photometry.sql b/src/mobu/data/tapquerysetrunner/dp0.1/object-photometry.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/object-photometry.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/object-photometry.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/object-reference.sql b/src/mobu/data/tapquerysetrunner/dp0.1/object-reference.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/object-reference.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/object-reference.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/object-several.sql b/src/mobu/data/tapquerysetrunner/dp0.1/object-several.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/object-several.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/object-several.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/params.yaml b/src/mobu/data/tapquerysetrunner/dp0.1/params.yaml similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/params.yaml rename to src/mobu/data/tapquerysetrunner/dp0.1/params.yaml diff --git a/src/mobu/data/tapqueryrunner/dp0.1/polygon-object.sql b/src/mobu/data/tapquerysetrunner/dp0.1/polygon-object.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/polygon-object.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/polygon-object.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/polygon-position.sql b/src/mobu/data/tapquerysetrunner/dp0.1/polygon-position.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/polygon-position.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/polygon-position.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/scan-flux.sql b/src/mobu/data/tapquerysetrunner/dp0.1/scan-flux.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/scan-flux.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/scan-flux.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/scan-magnitude.sql b/src/mobu/data/tapquerysetrunner/dp0.1/scan-magnitude.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/scan-magnitude.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/scan-magnitude.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.1/scan-top.sql b/src/mobu/data/tapquerysetrunner/dp0.1/scan-top.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.1/scan-top.sql rename to src/mobu/data/tapquerysetrunner/dp0.1/scan-top.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2-long-scans/histogram-forcedsource.sql b/src/mobu/data/tapquerysetrunner/dp0.2-long-scans/histogram-forcedsource.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-long-scans/histogram-forcedsource.sql rename to src/mobu/data/tapquerysetrunner/dp0.2-long-scans/histogram-forcedsource.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2-long-scans/params.yaml b/src/mobu/data/tapquerysetrunner/dp0.2-long-scans/params.yaml similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-long-scans/params.yaml rename to src/mobu/data/tapquerysetrunner/dp0.2-long-scans/params.yaml diff --git a/src/mobu/data/tapqueryrunner/dp0.2-long-scans/scan-forcedsource.sql b/src/mobu/data/tapquerysetrunner/dp0.2-long-scans/scan-forcedsource.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-long-scans/scan-forcedsource.sql rename to src/mobu/data/tapquerysetrunner/dp0.2-long-scans/scan-forcedsource.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2-long-scans/time-series-several.sql b/src/mobu/data/tapquerysetrunner/dp0.2-long-scans/time-series-several.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-long-scans/time-series-several.sql rename to src/mobu/data/tapquerysetrunner/dp0.2-long-scans/time-series-several.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2-med-scans/histogram-object.sql b/src/mobu/data/tapquerysetrunner/dp0.2-med-scans/histogram-object.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-med-scans/histogram-object.sql rename to src/mobu/data/tapquerysetrunner/dp0.2-med-scans/histogram-object.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2-med-scans/object-several.sql b/src/mobu/data/tapquerysetrunner/dp0.2-med-scans/object-several.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-med-scans/object-several.sql rename to src/mobu/data/tapquerysetrunner/dp0.2-med-scans/object-several.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2-med-scans/params.yaml b/src/mobu/data/tapquerysetrunner/dp0.2-med-scans/params.yaml similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-med-scans/params.yaml rename to src/mobu/data/tapquerysetrunner/dp0.2-med-scans/params.yaml diff --git a/src/mobu/data/tapqueryrunner/dp0.2-med-scans/scan-object-top.sql b/src/mobu/data/tapquerysetrunner/dp0.2-med-scans/scan-object-top.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-med-scans/scan-object-top.sql rename to src/mobu/data/tapquerysetrunner/dp0.2-med-scans/scan-object-top.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2-med-scans/scan-object.sql b/src/mobu/data/tapquerysetrunner/dp0.2-med-scans/scan-object.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2-med-scans/scan-object.sql rename to src/mobu/data/tapquerysetrunner/dp0.2-med-scans/scan-object.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2/cone-object.sql b/src/mobu/data/tapquerysetrunner/dp0.2/cone-object.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/cone-object.sql rename to src/mobu/data/tapquerysetrunner/dp0.2/cone-object.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2/cone-source.sql b/src/mobu/data/tapquerysetrunner/dp0.2/cone-source.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/cone-source.sql rename to src/mobu/data/tapquerysetrunner/dp0.2/cone-source.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2/near-neighbor.sql b/src/mobu/data/tapquerysetrunner/dp0.2/near-neighbor.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/near-neighbor.sql rename to src/mobu/data/tapquerysetrunner/dp0.2/near-neighbor.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2/object-one.sql b/src/mobu/data/tapquerysetrunner/dp0.2/object-one.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/object-one.sql rename to src/mobu/data/tapquerysetrunner/dp0.2/object-one.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2/params.yaml b/src/mobu/data/tapquerysetrunner/dp0.2/params.yaml similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/params.yaml rename to src/mobu/data/tapquerysetrunner/dp0.2/params.yaml diff --git a/src/mobu/data/tapqueryrunner/dp0.2/polygon-object.sql b/src/mobu/data/tapquerysetrunner/dp0.2/polygon-object.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/polygon-object.sql rename to src/mobu/data/tapquerysetrunner/dp0.2/polygon-object.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2/polygon-source.sql b/src/mobu/data/tapquerysetrunner/dp0.2/polygon-source.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/polygon-source.sql rename to src/mobu/data/tapquerysetrunner/dp0.2/polygon-source.sql diff --git a/src/mobu/data/tapqueryrunner/dp0.2/time-series-one.sql b/src/mobu/data/tapquerysetrunner/dp0.2/time-series-one.sql similarity index 100% rename from src/mobu/data/tapqueryrunner/dp0.2/time-series-one.sql rename to src/mobu/data/tapquerysetrunner/dp0.2/time-series-one.sql diff --git a/src/mobu/models/business/tap.py b/src/mobu/models/business/tap.py new file mode 100644 index 00000000..d657d896 --- /dev/null +++ b/src/mobu/models/business/tap.py @@ -0,0 +1,36 @@ +"""Base models for TAP-related monkey business.""" + +from __future__ import annotations + +from pydantic import Field + +from .base import BusinessData, BusinessOptions + +__all__ = [ + "TAPBusinessData", + "TAPBusinessOptions", +] + + +class TAPBusinessOptions(BusinessOptions): + """Options for any business that runs TAP queries.""" + + sync: bool = Field( + True, + title="Whether to run TAP queries as sync or async", + description=( + "By default, queries to TAP are run via the sync endpoint." + " Set this to false to run as an async query." + ), + example=True, + ) + + +class TAPBusinessData(BusinessData): + """Status of a running TAPQueryRunner business.""" + + running_query: str | None = Field( + None, + title="Currently running query", + description="Will not be present if no query is being executed", + ) diff --git a/src/mobu/models/business/tapqueryrunner.py b/src/mobu/models/business/tapqueryrunner.py index dd0091d4..465d3970 100644 --- a/src/mobu/models/business/tapqueryrunner.py +++ b/src/mobu/models/business/tapqueryrunner.py @@ -6,58 +6,36 @@ from pydantic import Field -from .base import BusinessConfig, BusinessData, BusinessOptions +from .base import BusinessConfig +from .tap import TAPBusinessOptions __all__ = [ "TAPQueryRunnerConfig", - "TAPQueryRunnerData", "TAPQueryRunnerOptions", ] -class TAPQueryRunnerOptions(BusinessOptions): +class TAPQueryRunnerOptions(TAPBusinessOptions): """Options for TAPQueryRunner monkey business.""" - query_set: str = Field( - "dp0.1", - title="Which query template set to use for a TapQueryRunner", - example="dp0.2", - ) - - sync: bool = Field( - True, - title="Whether to run TAP queries as sync or async", - description=( - "By default, queries to TAP are run via the sync endpoint." - " Set this to false to run as an async query." - ), - example=True, - ) - - queries: list | None = Field( - None, - title="Which query list to use for the TapQueryRunner", - description="List of queries to be run instead of a query_set", - example=True, + queries: list[str] = Field( + ..., + title="TAP queries", + description="List of queries to be run", + example=[ + "SELECT TOP 10 * FROM TAP_SCHEMA.schemas", + "SELECT TOP 10 * FROM MYDB.MyTable", + ], ) class TAPQueryRunnerConfig(BusinessConfig): """Configuration specialization for TAPQueryRunner.""" - type: Literal["TAPQueryRunner"] = Field(..., title="Type of business to run") - - options: TAPQueryRunnerOptions = Field( - default_factory=TAPQueryRunnerOptions, - title="Options for the monkey business", + type: Literal["TAPQueryRunner"] = Field( + ..., title="Type of business to run" ) - -class TAPQueryRunnerData(BusinessData): - """Status of a running TAPQueryRunner business.""" - - running_query: str | None = Field( - None, - title="Currently running query", - description="Will not be present if no query is being executed", + options: TAPQueryRunnerOptions = Field( + ..., title="Options for the monkey business" ) diff --git a/src/mobu/models/business/tapquerysetrunner.py b/src/mobu/models/business/tapquerysetrunner.py new file mode 100644 index 00000000..88b8d174 --- /dev/null +++ b/src/mobu/models/business/tapquerysetrunner.py @@ -0,0 +1,38 @@ +"""Models for the TAPQuerySetRunner monkey business.""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import Field + +from .base import BusinessConfig +from .tap import TAPBusinessOptions + +__all__ = [ + "TAPQuerySetRunnerConfig", + "TAPQuerySetRunnerOptions", +] + + +class TAPQuerySetRunnerOptions(TAPBusinessOptions): + """Options for TAPQueryRunner monkey business.""" + + query_set: str = Field( + "dp0.1", + title="Which query template set to use for a TapQueryRunner", + example="dp0.2", + ) + + +class TAPQuerySetRunnerConfig(BusinessConfig): + """Configuration specialization for TAPQuerySetRunner.""" + + type: Literal["TAPQuerySetRunner"] = Field( + ..., title="Type of business to run" + ) + + options: TAPQuerySetRunnerOptions = Field( + default_factory=TAPQuerySetRunnerOptions, + title="Options for the monkey business", + ) diff --git a/src/mobu/models/flock.py b/src/mobu/models/flock.py index 9364493b..be4b6aee 100644 --- a/src/mobu/models/flock.py +++ b/src/mobu/models/flock.py @@ -9,6 +9,7 @@ from .business.jupyterpythonloop import JupyterPythonLoopConfig from .business.notebookrunner import NotebookRunnerConfig from .business.tapqueryrunner import TAPQueryRunnerConfig +from .business.tapquerysetrunner import TAPQuerySetRunnerConfig from .monkey import MonkeyData from .user import User, UserSpec @@ -49,6 +50,7 @@ class FlockConfig(BaseModel): business: ( TAPQueryRunnerConfig + | TAPQuerySetRunnerConfig | NotebookRunnerConfig | JupyterPythonLoopConfig | EmptyLoopConfig diff --git a/src/mobu/models/monkey.py b/src/mobu/models/monkey.py index 990a7723..5767d7d5 100644 --- a/src/mobu/models/monkey.py +++ b/src/mobu/models/monkey.py @@ -7,7 +7,7 @@ from .business.base import BusinessData from .business.notebookrunner import NotebookRunnerData from .business.nublado import NubladoBusinessData -from .business.tapqueryrunner import TAPQueryRunnerData +from .business.tap import TAPBusinessData from .user import AuthenticatedUser @@ -38,7 +38,7 @@ class MonkeyData(BaseModel): # to avoid the risk that Pydantic plus FastAPI will interpret a class as # its parent class. business: ( - TAPQueryRunnerData + TAPBusinessData | NotebookRunnerData | NubladoBusinessData | BusinessData diff --git a/src/mobu/services/business/tap.py b/src/mobu/services/business/tap.py new file mode 100644 index 00000000..450b94c6 --- /dev/null +++ b/src/mobu/services/business/tap.py @@ -0,0 +1,160 @@ +"""Base class for executing TAP queries.""" + +from __future__ import annotations + +import asyncio +from abc import ABCMeta, abstractmethod +from concurrent.futures import ThreadPoolExecutor +from typing import Generic, TypeVar + +import pyvo +import requests +from httpx import AsyncClient +from structlog.stdlib import BoundLogger + +from ...config import config +from ...exceptions import CodeExecutionError, TAPClientError +from ...models.business.tap import TAPBusinessData, TAPBusinessOptions +from ...models.user import AuthenticatedUser +from .base import Business + +T = TypeVar("T", bound="TAPBusinessOptions") + +__all__ = ["TAPBusiness"] + + +class TAPBusiness(Business, Generic[T], metaclass=ABCMeta): + """Base class for business that executes TAP query. + + This class modifies the core `~mobu.business.base.Business` loop by + providing `startup`, `execute`, and `shutdown` methods that know how to + execute TAP queries. Subclasses must override `get_next_query` to return + the next query they want to execute. + + Parameters + ---------- + options + Configuration options for the business. + user + User with their authentication token to use to run the business. + http_client + Shared HTTP client for general web access. + logger + Logger to use to report the results of business. + """ + + def __init__( + self, + options: T, + user: AuthenticatedUser, + http_client: AsyncClient, + logger: BoundLogger, + ) -> None: + super().__init__(options, user, http_client, logger) + self._running_query: str | None = None + self._client: pyvo.dal.TAPService | None = None + self._pool = ThreadPoolExecutor(max_workers=1) + + async def startup(self) -> None: + with self.timings.start("make_client"): + self._client = self._make_client(self.user.token) + + @abstractmethod + def get_next_query(self) -> str: + """Get the next TAP query to run. + + Returns + ------- + str + TAP query as a string. + """ + + async def execute(self) -> None: + query = self.get_next_query() + with self.timings.start("execute_query", {"query": query}) as sw: + self._running_query = query + + try: + if self.options.sync: + await self.run_sync_query(query) + else: + await self.run_async_query(query) + except Exception as e: + raise CodeExecutionError( + user=self.user.username, + code=query, + code_type="TAP query", + error=f"{type(e).__name__}: {e!s}", + ) from e + + self._running_query = None + elapsed = sw.elapsed.total_seconds() + + self.logger.info(f"Query finished after {elapsed} seconds") + + async def run_async_query(self, query: str) -> None: + """Run the query asynchronously. + + Parameters + ---------- + query + Query string to execute. + """ + if not self._client: + raise RuntimeError("TAPBusiness startup never ran") + self.logger.info(f"Running (async): {query}") + job = self._client.submit_job(query) + try: + job.run() + while job.phase not in ("COMPLETED", "ERROR"): + await asyncio.sleep(30) + finally: + job.delete() + + async def run_sync_query(self, query: str) -> None: + """Run the query synchronously. + + Parameters + ---------- + query + Query string to execute. + """ + if not self._client: + raise RuntimeError("TAPBusiness startup never ran") + self.logger.info(f"Running (sync): {query}") + loop = asyncio.get_event_loop() + await loop.run_in_executor(self._pool, self._client.search, query) + + def dump(self) -> TAPBusinessData: + return TAPBusinessData( + running_query=self._running_query, **super().dump().dict() + ) + + def _make_client(self, token: str) -> pyvo.dal.TAPService: + """Create a TAP client. + + Parameters + ---------- + token + User authentication token. + + Returns + ------- + pyvo.dal.TAPService + TAP client object. + """ + if not config.environment_url: + raise RuntimeError("environment_url not set") + tap_url = str(config.environment_url).rstrip("/") + "/api/tap" + try: + s = requests.Session() + s.headers["Authorization"] = "Bearer " + token + auth = pyvo.auth.AuthSession() + auth.credentials.set("lsst-token", s) + auth.add_security_method_for_url(tap_url, "lsst-token") + auth.add_security_method_for_url(tap_url + "/sync", "lsst-token") + auth.add_security_method_for_url(tap_url + "/async", "lsst-token") + auth.add_security_method_for_url(tap_url + "/tables", "lsst-token") + return pyvo.dal.TAPService(tap_url, auth) + except Exception as e: + raise TAPClientError(e, user=self.user.username) from e diff --git a/src/mobu/services/business/tapqueryrunner.py b/src/mobu/services/business/tapqueryrunner.py index 95cfd016..f2c8b4c8 100644 --- a/src/mobu/services/business/tapqueryrunner.py +++ b/src/mobu/services/business/tapqueryrunner.py @@ -2,121 +2,19 @@ from __future__ import annotations -import asyncio -import importlib.resources -import math -from typing import Any, Protocol, Union, runtime_checkable -from concurrent.futures import ThreadPoolExecutor from random import SystemRandom -from enum import Enum -import jinja2 -import pyvo -import requests -import shortuuid -import yaml + from httpx import AsyncClient from structlog.stdlib import BoundLogger -from ...config import config -from ...exceptions import CodeExecutionError, TAPClientError -from ...models.business.tapqueryrunner import ( - TAPQueryRunnerData, - TAPQueryRunnerOptions, -) +from ...models.business.tapqueryrunner import TAPQueryRunnerOptions from ...models.user import AuthenticatedUser -from .base import Business - - -@runtime_checkable -class TAPQueryContext(Protocol): - """Query Context Protocol - Defines the methods that should be implemented for various query context implementations. - Query context: Where/how the collection of queries to be run is generated from. - """ - - def __init__(self, **_arg: Any) -> None: - ... - - class ContextTypes(Enum): - """Define different types of query contexts""" - - QUERY_LIST = "QUERY_LIST" # List of queries passed in as options - TEMPLATES = "TEMPLATES" # Templates from local filepath - - @property - def context_type(self) -> TAPQueryContext.ContextTypes: - """Get the context type""" - ... - - def get_next_query(self) -> str: - """Get the next query""" - ... - - -class TAPQueryContextTemplates: - """Context is template based here, i.e. the queries are read from local filepath as - Jinja templates. - """ - - def __init__(self, taprunner: TAPQueryRunner) -> None: - self.taprunner = taprunner - - @property - def context_type(self) -> TAPQueryContext.ContextTypes: - """Get Context Type - - Returns: - TAPQueryContext.ContextTypes: The context type - """ - return TAPQueryContext.ContextTypes.TEMPLATES - - def get_next_query(self) -> str: - """Get a query from the query_set randomly, using the random_engine of the TAP Runner - Render query from template, using generated parameters - - Returns: - str: The next query string - """ - template_name = self.taprunner.random_engine.choice( - self.taprunner.env.list_templates(["sql"]) - ) - template = self.taprunner.env.get_template(template_name) - query = template.render(self.taprunner.generated_params) - return query - - -class TAPQueryContextQueryList: - """Context for generating queries from a given list of query strings.""" - - def __init__(self, taprunner: TAPQueryRunner) -> None: - self.taprunner = taprunner - - @property - def context_type(self) -> TAPQueryContext.ContextTypes: - """Get Context Type - - Returns: - TAPQueryContext.ContextTypes: The context type - """ - return TAPQueryContext.ContextTypes.QUERY_LIST - - def get_next_query(self) -> str: - """Get a query from the list randomly, using the random_engine of the TAP Runner - - Returns: - str: The next query string - """ - return self.taprunner.random_engine.choice(self.taprunner.queries) - +from .tap import TAPBusiness -# Mapping of context types to TAPQueryContext class type -TAP_QUERY_CONTEXTS = { - TAPQueryContext.ContextTypes.QUERY_LIST: TAPQueryContextQueryList, - TAPQueryContext.ContextTypes.TEMPLATES: TAPQueryContextTemplates, -} +__all__ = ["TAPQueryRunner"] -class TAPQueryRunner(Business): +class TAPQueryRunner(TAPBusiness): """Run queries against TAP. Parameters @@ -139,237 +37,7 @@ def __init__( logger: BoundLogger, ) -> None: super().__init__(options, user, http_client, logger) - self._running_query: str | None = None - self._client: pyvo.dal.TAPService | None = None self._random = SystemRandom() - self._pool = ThreadPoolExecutor(max_workers=1) - self._context = self._get_context(options) - self._env = self._get_environment() - self._params = self._get_params() - self._queries = options.queries - - async def startup(self) -> None: - if self._context.context_type is TAPQueryContext.ContextTypes.TEMPLATES: - templates = self._env.list_templates(["sql"]) - self.logger.info("Query templates to choose from: %s", templates) - with self.timings.start("make_client"): - self._client = self._make_client(self.user.token) def get_next_query(self) -> str: - """Get the next query string from the context - - Returns: - str: The next query string - """ - return self._context.get_next_query() - - @property - def queries(self): - return self._queries - - @property - def env(self): - return self._env - - @property - def random_engine(self): - return self._random - - @property - def params(self): - return self._params - - @property - def generated_params(self): - return self._generate_parameters() - - def _get_context( - self, - options, - ) -> Union[TAPQueryContextQueryList, TAPQueryContextTemplates]: - """Get the context for this TAP query runner - - Parameters: - options (TAPQueryRunnerOptions): The runner options based on which to get the context - """ - if options.queries: - return TAP_QUERY_CONTEXTS[TAPQueryContext.ContextTypes.QUERY_LIST]( - taprunner=self - ) - return TAP_QUERY_CONTEXTS[TAPQueryContext.ContextTypes.TEMPLATES]( - taprunner=self - ) - - def _get_environment(self) -> Union[jinja2.Environment, None]: - """Get the jinha2 template if applicable else return None - - Returns: - Union[jinja2.Environment, None]: Return the jinja2 Environment, or None - """ - if self._context.context_type is not TAPQueryContext.ContextTypes.TEMPLATES: - return None - - # Load templates and parameters. The path has to be specified in two - # different ways: as a relative path for Jinja's PackageLoader, and as - # a sequence of joinpath operations for importlib.resources. - template_path = ("data", "tapqueryrunner", self.options.query_set) - env = jinja2.Environment( - loader=jinja2.PackageLoader("mobu", "/".join(template_path)), - undefined=jinja2.StrictUndefined, - autoescape=jinja2.select_autoescape(disabled_extensions=["sql"]), - ) - return env - - def _get_params(self) -> Union[dict, None]: - """Get the parameters as a dictionary if applicable else return None - - Returns: - Union[dict, None]: Return the parameters as a dict, or None - """ - if self._context.context_type is not TAPQueryContext.ContextTypes.TEMPLATES: - return None - template_path = ("data", "tapqueryrunner", self.options.query_set) - files = importlib.resources.files("mobu") - for directory in template_path: - files = files.joinpath(directory) - with files.joinpath("params.yaml").open("r") as f: - params = yaml.safe_load(f) - return params - - async def execute(self) -> None: - """Get and execute the next query from the context, synchronously or asynchronously - depending on options - """ - query = self.get_next_query() - with self.timings.start("execute_query", {"query": query}) as sw: - self._running_query = query - - try: - if self.options.sync: - await self.run_sync_query(query) - else: - await self.run_async_query(query) - except Exception as e: - raise CodeExecutionError( - user=self.user.username, - code=query, - code_type="TAP query", - error=f"{type(e).__name__}: {e!s}", - ) from e - - self._running_query = None - elapsed = sw.elapsed.total_seconds() - - self.logger.info(f"Query finished after {elapsed} seconds") - - async def run_async_query(self, query: str) -> None: - """Run the query asynchronously - - Parameters: - query (str): The query string to execute - """ - if not self._client: - raise RuntimeError("TAPQueryRunner startup never ran") - self.logger.info("Running (async): %s", query) - job = self._client.submit_job(query) - try: - job.run() - while job.phase not in ("COMPLETED", "ERROR"): - await asyncio.sleep(30) - finally: - job.delete() - - async def run_sync_query(self, query: str) -> None: - """Run the query synchronously - - Parameters: - query (str): The query string to execute - """ - if not self._client: - raise RuntimeError("TAPQueryRunner startup never ran") - self.logger.info("Running (sync): %s", query) - loop = asyncio.get_event_loop() - await loop.run_in_executor(self._pool, self._client.search, query) - - def dump(self) -> TAPQueryRunnerData: - return TAPQueryRunnerData( - running_query=self._running_query, **super().dump().dict() - ) - - def _make_client(self, token: str) -> pyvo.dal.TAPService: - if not config.environment_url: - raise RuntimeError("environment_url not set") - tap_url = str(config.environment_url).rstrip("/") + "/api/tap" - try: - s = requests.Session() - s.headers["Authorization"] = "Bearer " + token - auth = pyvo.auth.AuthSession() - auth.credentials.set("lsst-token", s) - auth.add_security_method_for_url(tap_url, "lsst-token") - auth.add_security_method_for_url(tap_url + "/sync", "lsst-token") - auth.add_security_method_for_url(tap_url + "/async", "lsst-token") - auth.add_security_method_for_url(tap_url + "/tables", "lsst-token") - return pyvo.dal.TAPService(tap_url, auth) - except Exception as e: - raise TAPClientError(e, user=self.user.username) from e - - def _generate_random_polygon( - self, - *, - min_ra: float, - max_ra: float, - min_dec: float, - max_dec: float, - min_radius: float, - radius_range: float, - ) -> str: - """Generate a random polygon as comma-separated ra/dec values.""" - ra = min_ra + self._random.random() * (max_ra - min_ra) - dec = min_dec + self._random.random() * (max_dec - min_dec) - r = min_radius + self._random.random() * radius_range - n = self._random.randrange(3, 8) - phi = self._random.random() * 2 * math.pi - poly = [] - for theta in [phi + i * 2 * math.pi / n for i in range(n)]: - poly.append(ra + r * math.sin(theta)) - poly.append(dec + r * math.cos(theta)) - return ", ".join([str(x) for x in poly]) - - def _generate_parameters(self) -> dict[str, int | float | str]: - """Generate some random parameters for the query.""" - min_ra = self._params.get("min_ra", 55.0) - max_ra = self._params.get("max_ra", 70.0) - min_dec = self._params.get("min_dec", -42.0) - max_dec = self._params.get("max_dec", -30.0) - min_radius = self._params.get("min_radius", 0.01) - radius_range = self._params.get("radius_range", 0.04) - radius_near_range = self._params.get("radius_near_range", 0.09) - min_flux = 0.0 + self._random.random() * 0.00100 - min_mag = 15.0 + self._random.random() * 15.0 - result = { - "ra": min_ra + self._random.random() * (max_ra - min_ra), - "dec": min_dec + self._random.random() * (max_dec - min_dec), - "min_flux": min_flux, - "max_flux": min_flux + 0.00001, - "min_mag": min_mag, - "max_mag": min_mag + 0.1, - "polygon": self._generate_random_polygon( - min_ra=min_ra, - max_ra=max_ra, - min_dec=min_dec, - max_dec=max_dec, - min_radius=min_radius, - radius_range=radius_range, - ), - "radius": min_radius + self._random.random() * radius_range, - "radius_near": (min_radius + self._random.random() * radius_near_range), - "username": self.user.username, - "query_id": "mobu-" + shortuuid.uuid(), - } - object_ids = self._params.get("object_ids") - if object_ids: - result["object"] = str(self._random.choice(object_ids)) - result["objects"] = ", ".join( - str(o) for o in self._random.choices(object_ids, k=12) - ) - return result + return self._random.choice(self.options.queries) diff --git a/src/mobu/services/business/tapquerysetrunner.py b/src/mobu/services/business/tapquerysetrunner.py new file mode 100644 index 00000000..3f997cc4 --- /dev/null +++ b/src/mobu/services/business/tapquerysetrunner.py @@ -0,0 +1,140 @@ +"""Run a set of predefined queries against a TAP service.""" + +from __future__ import annotations + +import importlib.resources +import math +from random import SystemRandom + +import jinja2 +import shortuuid +import yaml +from httpx import AsyncClient +from structlog.stdlib import BoundLogger + +from ...models.business.tapquerysetrunner import TAPQuerySetRunnerOptions +from ...models.user import AuthenticatedUser +from .tap import TAPBusiness + +__all__ = ["TAPQuerySetRunner"] + + +class TAPQuerySetRunner(TAPBusiness): + """Run queries from a predefined set against TAP with random parameters. + + Parameters + ---------- + options + Configuration options for the business. + user + User with their authentication token to use to run the business. + http_client + Shared HTTP client for general web access. + logger + Logger to use to report the results of business. + """ + + def __init__( + self, + options: TAPQuerySetRunnerOptions, + user: AuthenticatedUser, + http_client: AsyncClient, + logger: BoundLogger, + ) -> None: + super().__init__(options, user, http_client, logger) + self._random = SystemRandom() + + # Load templates and parameters. The path has to be specified in two + # different ways: as a relative path for Jinja's PackageLoader, and as + # a sequence of joinpath operations for importlib.resources. + template_path = ("data", "tapquerysetrunner", self.options.query_set) + self._env = jinja2.Environment( + loader=jinja2.PackageLoader("mobu", "/".join(template_path)), + undefined=jinja2.StrictUndefined, + autoescape=jinja2.select_autoescape(disabled_extensions=["sql"]), + ) + files = importlib.resources.files("mobu") + for directory in template_path: + files = files.joinpath(directory) + with files.joinpath("params.yaml").open("r") as f: + self._params = yaml.safe_load(f) + + async def startup(self) -> None: + await super().startup() + templates = self._env.list_templates(["sql"]) + self.logger.info("Query templates to choose from: %s", templates) + + def get_next_query(self) -> str: + """Choose a random query from the query set. + + Returns + ------- + str + Next TAP query to run. + """ + template_name = self._random.choice(self._env.list_templates(["sql"])) + template = self._env.get_template(template_name) + return template.render(self._generate_parameters()) + + def _generate_random_polygon( + self, + *, + min_ra: float, + max_ra: float, + min_dec: float, + max_dec: float, + min_radius: float, + radius_range: float, + ) -> str: + """Generate a random polygon as comma-separated ra/dec values.""" + ra = min_ra + self._random.random() * (max_ra - min_ra) + dec = min_dec + self._random.random() * (max_dec - min_dec) + r = min_radius + self._random.random() * radius_range + n = self._random.randrange(3, 8) + phi = self._random.random() * 2 * math.pi + poly = [] + for theta in [phi + i * 2 * math.pi / n for i in range(n)]: + poly.append(ra + r * math.sin(theta)) + poly.append(dec + r * math.cos(theta)) + return ", ".join([str(x) for x in poly]) + + def _generate_parameters(self) -> dict[str, int | float | str]: + """Generate some random parameters for the query.""" + min_ra = self._params.get("min_ra", 55.0) + max_ra = self._params.get("max_ra", 70.0) + min_dec = self._params.get("min_dec", -42.0) + max_dec = self._params.get("max_dec", -30.0) + min_radius = self._params.get("min_radius", 0.01) + radius_range = self._params.get("radius_range", 0.04) + radius_near_range = self._params.get("radius_near_range", 0.09) + min_flux = 0.0 + self._random.random() * 0.00100 + min_mag = 15.0 + self._random.random() * 15.0 + result = { + "ra": min_ra + self._random.random() * (max_ra - min_ra), + "dec": min_dec + self._random.random() * (max_dec - min_dec), + "min_flux": min_flux, + "max_flux": min_flux + 0.00001, + "min_mag": min_mag, + "max_mag": min_mag + 0.1, + "polygon": self._generate_random_polygon( + min_ra=min_ra, + max_ra=max_ra, + min_dec=min_dec, + max_dec=max_dec, + min_radius=min_radius, + radius_range=radius_range, + ), + "radius": min_radius + self._random.random() * radius_range, + "radius_near": ( + min_radius + self._random.random() * radius_near_range + ), + "username": self.user.username, + "query_id": "mobu-" + shortuuid.uuid(), + } + object_ids = self._params.get("object_ids") + if object_ids: + result["object"] = str(self._random.choice(object_ids)) + result["objects"] = ", ".join( + str(o) for o in self._random.choices(object_ids, k=12) + ) + return result diff --git a/src/mobu/services/monkey.py b/src/mobu/services/monkey.py index 2d1ae58a..144b4e39 100644 --- a/src/mobu/services/monkey.py +++ b/src/mobu/services/monkey.py @@ -22,6 +22,7 @@ from ..models.business.jupyterpythonloop import JupyterPythonLoopConfig from ..models.business.notebookrunner import NotebookRunnerConfig from ..models.business.tapqueryrunner import TAPQueryRunnerConfig +from ..models.business.tapquerysetrunner import TAPQuerySetRunnerConfig from ..models.monkey import MonkeyData, MonkeyState from ..models.user import AuthenticatedUser from .business.base import Business @@ -29,6 +30,7 @@ from .business.jupyterpythonloop import JupyterPythonLoop from .business.notebookrunner import NotebookRunner from .business.tapqueryrunner import TAPQueryRunner +from .business.tapquerysetrunner import TAPQuerySetRunner __all__ = ["Monkey"] @@ -99,6 +101,10 @@ def __init__( self.business = TAPQueryRunner( business_config.options, user, self._http_client, self._logger ) + elif isinstance(business_config, TAPQuerySetRunnerConfig): + self.business = TAPQuerySetRunner( + business_config.options, user, self._http_client, self._logger + ) else: msg = f"Unknown business config {business_config}" raise TypeError(msg) diff --git a/tests/business/tapqueryrunner_test.py b/tests/business/tapqueryrunner_test.py index 9643d58a..ed14da42 100644 --- a/tests/business/tapqueryrunner_test.py +++ b/tests/business/tapqueryrunner_test.py @@ -2,23 +2,12 @@ from __future__ import annotations -from pathlib import Path -from typing import cast from unittest.mock import ANY, patch import pytest import pyvo import respx -import structlog -import yaml from httpx import AsyncClient -from safir.dependencies.http_client import http_client_dependency -from safir.testing.slack import MockSlackWebhook - -import mobu -from mobu.models.business.tapqueryrunner import TAPQueryRunnerOptions -from mobu.models.user import AuthenticatedUser -from mobu.services.business.tapqueryrunner import TAPQueryRunner from ..support.gafaelfawr import mock_gafaelfawr from ..support.util import wait_for_business @@ -27,6 +16,10 @@ @pytest.mark.asyncio async def test_run(client: AsyncClient, respx_mock: respx.Router) -> None: mock_gafaelfawr(respx_mock) + queries = [ + "SELECT TOP 10 * FROM TAP_SCHEMA.tables", + "SELECT TOP 10 * FROM TAP_SCHEMA.columns", + ] with patch.object(pyvo.dal, "TAPService"): r = await client.put( @@ -36,7 +29,10 @@ async def test_run(client: AsyncClient, respx_mock: respx.Router) -> None: "count": 1, "user_spec": {"username_prefix": "testuser"}, "scopes": ["exec:notebook"], - "business": {"type": "TAPQueryRunner"}, + "business": { + "type": "TAPQueryRunner", + "options": {"queries": queries}, + }, }, ) assert r.status_code == 201 @@ -63,216 +59,9 @@ async def test_run(client: AsyncClient, respx_mock: respx.Router) -> None: r = await client.get("/mobu/flocks/test/monkeys/testuser1/log") assert r.status_code == 200 assert "Running (sync): " in r.text + found = False + for query in queries: + if query in r.text: + found = True + assert found, "Ran one of the appropriate queries" assert "Query finished after " in r.text - - -@pytest.mark.asyncio -async def test_setup_error( - client: AsyncClient, - slack: MockSlackWebhook, - respx_mock: respx.Router, -) -> None: - """Test that client creation is deferred to setup. - - This also doubles as a test that failures during setup are recorded as a - failed test execution and result in a Slack alert. - """ - mock_gafaelfawr(respx_mock) - - r = await client.put( - "/mobu/flocks", - json={ - "name": "test", - "count": 1, - "users": [{"username": "tapuser"}], - "scopes": ["exec:notebook"], - "business": {"type": "TAPQueryRunner"}, - }, - ) - assert r.status_code == 201 - - # Wait until we've finished at least one loop and check the results. - data = await wait_for_business(client, "tapuser") - assert data["business"]["failure_count"] == 1 - - assert slack.messages == [ - { - "blocks": [ - { - "type": "section", - "text": { - "type": "mrkdwn", - "text": ( - "Unable to create TAP client: DALServiceError:" - " No working capabilities endpoint provided" - ), - "verbatim": True, - }, - }, - { - "type": "section", - "fields": [ - {"type": "mrkdwn", "text": ANY, "verbatim": True}, - {"type": "mrkdwn", "text": ANY, "verbatim": True}, - { - "type": "mrkdwn", - "text": "*Exception type*\nTAPClientError", - "verbatim": True, - }, - { - "type": "mrkdwn", - "text": "*Monkey*\ntest/tapuser", - "verbatim": True, - }, - { - "type": "mrkdwn", - "text": "*User*\ntapuser", - "verbatim": True, - }, - { - "type": "mrkdwn", - "text": "*Event*\nmake_client", - "verbatim": True, - }, - ], - }, - {"type": "divider"}, - ] - } - ] - - -@pytest.mark.asyncio -async def test_alert( - client: AsyncClient, slack: MockSlackWebhook, respx_mock: respx.Router -) -> None: - mock_gafaelfawr(respx_mock) - - with patch.object(pyvo.dal, "TAPService") as mock: - mock.return_value.search.side_effect = [Exception("some error")] - - r = await client.put( - "/mobu/flocks", - json={ - "name": "test", - "count": 1, - "user_spec": {"username_prefix": "testuser"}, - "scopes": ["exec:notebook"], - "business": {"type": "TAPQueryRunner"}, - }, - ) - assert r.status_code == 201 - - # Wait until we've finished at least one loop and check the results. - data = await wait_for_business(client, "testuser1") - assert data["business"]["failure_count"] == 1 - - assert slack.messages == [ - { - "blocks": [ - { - "type": "section", - "text": { - "type": "mrkdwn", - "text": "Error while running TAP query", - "verbatim": True, - }, - }, - { - "type": "section", - "fields": [ - {"type": "mrkdwn", "text": ANY, "verbatim": True}, - {"type": "mrkdwn", "text": ANY, "verbatim": True}, - { - "type": "mrkdwn", - "text": "*Exception type*\nCodeExecutionError", - "verbatim": True, - }, - { - "type": "mrkdwn", - "text": "*Monkey*\ntest/testuser1", - "verbatim": True, - }, - { - "type": "mrkdwn", - "text": "*User*\ntestuser1", - "verbatim": True, - }, - { - "type": "mrkdwn", - "text": "*Event*\nexecute_query", - "verbatim": True, - }, - ], - }, - ], - "attachments": [ - { - "blocks": [ - { - "type": "section", - "text": { - "type": "mrkdwn", - "text": ("*Error*\n" "```\nException: some error\n```"), - "verbatim": True, - }, - }, - { - "type": "section", - "text": { - "type": "mrkdwn", - "text": ANY, - "verbatim": True, - }, - }, - ] - } - ], - } - ] - - -@pytest.mark.asyncio -async def test_random_object() -> None: - for query_set in ("dp0.1", "dp0.2"): - params_path = ( - Path(mobu.__file__).parent - / "data" - / "tapqueryrunner" - / query_set - / "params.yaml" - ) - with params_path.open("r") as f: - objects = [str(o) for o in yaml.safe_load(f)["object_ids"]] - - user = AuthenticatedUser( - username="user", scopes=["read:tap"], token="blah blah" - ) - logger = structlog.get_logger(__file__) - options = TAPQueryRunnerOptions(query_set=query_set) - http_client = await http_client_dependency() - with patch.object(pyvo.dal, "TAPService"): - runner = TAPQueryRunner(options, user, http_client, logger) - parameters = runner._generate_parameters() - - assert parameters["object"] in objects - random_objects = cast(str, parameters["objects"]).split(", ") - assert len(random_objects) == 12 - for obj in random_objects: - assert obj in objects - - -@pytest.mark.asyncio -async def test_query_list() -> None: - queries = [ - "SELECT TOP 10 * FROM TAP_SCHEMA.tables", - "SELECT TOP 10 * FROM TAP_SCHEMA.columns", - ] - user = AuthenticatedUser(username="user", scopes=["read:tap"], token="blah blah") - logger = structlog.get_logger(__file__) - options = TAPQueryRunnerOptions(queries=queries) - http_client = await http_client_dependency() - with patch.object(pyvo.dal, "TAPService"): - runner = TAPQueryRunner(options, user, http_client, logger) - generated_query = runner.get_next_query() - assert generated_query in queries diff --git a/tests/business/tapquerysetrunner_test.py b/tests/business/tapquerysetrunner_test.py new file mode 100644 index 00000000..3629ab0e --- /dev/null +++ b/tests/business/tapquerysetrunner_test.py @@ -0,0 +1,264 @@ +"""Tests for TAPQuerySetRunner.""" + +from __future__ import annotations + +from pathlib import Path +from typing import cast +from unittest.mock import ANY, patch + +import pytest +import pyvo +import respx +import structlog +import yaml +from httpx import AsyncClient +from safir.dependencies.http_client import http_client_dependency +from safir.testing.slack import MockSlackWebhook + +import mobu +from mobu.models.business.tapquerysetrunner import TAPQuerySetRunnerOptions +from mobu.models.user import AuthenticatedUser +from mobu.services.business.tapquerysetrunner import TAPQuerySetRunner + +from ..support.gafaelfawr import mock_gafaelfawr +from ..support.util import wait_for_business + + +@pytest.mark.asyncio +async def test_run(client: AsyncClient, respx_mock: respx.Router) -> None: + mock_gafaelfawr(respx_mock) + + with patch.object(pyvo.dal, "TAPService"): + r = await client.put( + "/mobu/flocks", + json={ + "name": "test", + "count": 1, + "user_spec": {"username_prefix": "testuser"}, + "scopes": ["exec:notebook"], + "business": {"type": "TAPQuerySetRunner"}, + }, + ) + assert r.status_code == 201 + + # Wait until we've finished at least one loop and check the results. + data = await wait_for_business(client, "testuser1") + assert data == { + "name": "testuser1", + "business": { + "failure_count": 0, + "name": "TAPQuerySetRunner", + "success_count": 1, + "timings": ANY, + }, + "state": "RUNNING", + "user": { + "scopes": ["exec:notebook"], + "token": ANY, + "username": "testuser1", + }, + } + + # Get the log and check that we logged the query. + r = await client.get("/mobu/flocks/test/monkeys/testuser1/log") + assert r.status_code == 200 + assert "Running (sync): " in r.text + assert "Query finished after " in r.text + + +@pytest.mark.asyncio +async def test_setup_error( + client: AsyncClient, + slack: MockSlackWebhook, + respx_mock: respx.Router, +) -> None: + """Test that client creation is deferred to setup. + + This also doubles as a test that failures during setup are recorded as a + failed test execution and result in a Slack alert. + """ + mock_gafaelfawr(respx_mock) + + r = await client.put( + "/mobu/flocks", + json={ + "name": "test", + "count": 1, + "users": [{"username": "tapuser"}], + "scopes": ["exec:notebook"], + "business": {"type": "TAPQuerySetRunner"}, + }, + ) + assert r.status_code == 201 + + # Wait until we've finished at least one loop and check the results. + data = await wait_for_business(client, "tapuser") + assert data["business"]["failure_count"] == 1 + + assert slack.messages == [ + { + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": ( + "Unable to create TAP client: DALServiceError:" + " No working capabilities endpoint provided" + ), + "verbatim": True, + }, + }, + { + "type": "section", + "fields": [ + {"type": "mrkdwn", "text": ANY, "verbatim": True}, + {"type": "mrkdwn", "text": ANY, "verbatim": True}, + { + "type": "mrkdwn", + "text": "*Exception type*\nTAPClientError", + "verbatim": True, + }, + { + "type": "mrkdwn", + "text": "*Monkey*\ntest/tapuser", + "verbatim": True, + }, + { + "type": "mrkdwn", + "text": "*User*\ntapuser", + "verbatim": True, + }, + { + "type": "mrkdwn", + "text": "*Event*\nmake_client", + "verbatim": True, + }, + ], + }, + {"type": "divider"}, + ] + } + ] + + +@pytest.mark.asyncio +async def test_alert( + client: AsyncClient, slack: MockSlackWebhook, respx_mock: respx.Router +) -> None: + mock_gafaelfawr(respx_mock) + + with patch.object(pyvo.dal, "TAPService") as mock: + mock.return_value.search.side_effect = [Exception("some error")] + + r = await client.put( + "/mobu/flocks", + json={ + "name": "test", + "count": 1, + "user_spec": {"username_prefix": "testuser"}, + "scopes": ["exec:notebook"], + "business": {"type": "TAPQuerySetRunner"}, + }, + ) + assert r.status_code == 201 + + # Wait until we've finished at least one loop and check the results. + data = await wait_for_business(client, "testuser1") + assert data["business"]["failure_count"] == 1 + + assert slack.messages == [ + { + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "Error while running TAP query", + "verbatim": True, + }, + }, + { + "type": "section", + "fields": [ + {"type": "mrkdwn", "text": ANY, "verbatim": True}, + {"type": "mrkdwn", "text": ANY, "verbatim": True}, + { + "type": "mrkdwn", + "text": "*Exception type*\nCodeExecutionError", + "verbatim": True, + }, + { + "type": "mrkdwn", + "text": "*Monkey*\ntest/testuser1", + "verbatim": True, + }, + { + "type": "mrkdwn", + "text": "*User*\ntestuser1", + "verbatim": True, + }, + { + "type": "mrkdwn", + "text": "*Event*\nexecute_query", + "verbatim": True, + }, + ], + }, + ], + "attachments": [ + { + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": ( + "*Error*\n```\nException: some error\n```" + ), + "verbatim": True, + }, + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": ANY, + "verbatim": True, + }, + }, + ] + } + ], + } + ] + + +@pytest.mark.asyncio +async def test_random_object() -> None: + for query_set in ("dp0.1", "dp0.2"): + params_path = ( + Path(mobu.__file__).parent + / "data" + / "tapquerysetrunner" + / query_set + / "params.yaml" + ) + with params_path.open("r") as f: + objects = [str(o) for o in yaml.safe_load(f)["object_ids"]] + + user = AuthenticatedUser( + username="user", scopes=["read:tap"], token="blah blah" + ) + logger = structlog.get_logger(__file__) + options = TAPQuerySetRunnerOptions(query_set=query_set) + http_client = await http_client_dependency() + with patch.object(pyvo.dal, "TAPService"): + runner = TAPQuerySetRunner(options, user, http_client, logger) + parameters = runner._generate_parameters() + + assert parameters["object"] in objects + random_objects = cast(str, parameters["objects"]).split(", ") + assert len(random_objects) == 12 + for obj in random_objects: + assert obj in objects