Skip to content

Commit

Permalink
Refactor support for a list of TAP queries
Browse files Browse the repository at this point in the history
Rather than add a new layer of abstraction to allow two different
configurations of TAPQueryRunner, abstract the common functionality
of businesses that run TAP queries, rename the old TAPQueryRunner
to TAPQuerySetRunner, and add a new TAPQueryRunner that takes a
simple list of queries. This follows the existing pattern for keeping
each business simple and supporting a variety of tests via separate
business classes.
  • Loading branch information
rra committed Dec 14, 2023
1 parent 70b4760 commit 39e0fc5
Show file tree
Hide file tree
Showing 45 changed files with 685 additions and 601 deletions.
3 changes: 3 additions & 0 deletions changelog.d/20231214_124422_rra_DM_42182.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Backwards-incompatible changes

- Rename the existing `TAPQueryRunner` business to `TAPQuerySetRunner` to more accurately capture what it does. Add a new `TAPQueryRunner` business that runs queries chosen randomly from a list. Based on work by @stvoutsin.
36 changes: 36 additions & 0 deletions src/mobu/models/business/tap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Base models for TAP-related monkey business."""

from __future__ import annotations

from pydantic import Field

from .base import BusinessData, BusinessOptions

__all__ = [
"TAPBusinessData",
"TAPBusinessOptions",
]


class TAPBusinessOptions(BusinessOptions):
"""Options for any business that runs TAP queries."""

sync: bool = Field(
True,
title="Whether to run TAP queries as sync or async",
description=(
"By default, queries to TAP are run via the sync endpoint."
" Set this to false to run as an async query."
),
example=True,
)


class TAPBusinessData(BusinessData):
"""Status of a running TAPQueryRunner business."""

running_query: str | None = Field(
None,
title="Currently running query",
description="Will not be present if no query is being executed",
)
52 changes: 15 additions & 37 deletions src/mobu/models/business/tapqueryrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,58 +6,36 @@

from pydantic import Field

from .base import BusinessConfig, BusinessData, BusinessOptions
from .base import BusinessConfig
from .tap import TAPBusinessOptions

__all__ = [
"TAPQueryRunnerConfig",
"TAPQueryRunnerData",
"TAPQueryRunnerOptions",
]


class TAPQueryRunnerOptions(BusinessOptions):
class TAPQueryRunnerOptions(TAPBusinessOptions):
"""Options for TAPQueryRunner monkey business."""

query_set: str = Field(
"dp0.1",
title="Which query template set to use for a TapQueryRunner",
example="dp0.2",
)

sync: bool = Field(
True,
title="Whether to run TAP queries as sync or async",
description=(
"By default, queries to TAP are run via the sync endpoint."
" Set this to false to run as an async query."
),
example=True,
)

queries: list | None = Field(
None,
title="Which query list to use for the TapQueryRunner",
description="List of queries to be run instead of a query_set",
example=True,
queries: list[str] = Field(
...,
title="TAP queries",
description="List of queries to be run",
example=[
"SELECT TOP 10 * FROM TAP_SCHEMA.schemas",
"SELECT TOP 10 * FROM MYDB.MyTable",
],
)


class TAPQueryRunnerConfig(BusinessConfig):
"""Configuration specialization for TAPQueryRunner."""

type: Literal["TAPQueryRunner"] = Field(..., title="Type of business to run")

options: TAPQueryRunnerOptions = Field(
default_factory=TAPQueryRunnerOptions,
title="Options for the monkey business",
type: Literal["TAPQueryRunner"] = Field(
..., title="Type of business to run"
)


class TAPQueryRunnerData(BusinessData):
"""Status of a running TAPQueryRunner business."""

running_query: str | None = Field(
None,
title="Currently running query",
description="Will not be present if no query is being executed",
options: TAPQueryRunnerOptions = Field(
..., title="Options for the monkey business"
)
38 changes: 38 additions & 0 deletions src/mobu/models/business/tapquerysetrunner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Models for the TAPQuerySetRunner monkey business."""

from __future__ import annotations

from typing import Literal

from pydantic import Field

from .base import BusinessConfig
from .tap import TAPBusinessOptions

__all__ = [
"TAPQuerySetRunnerConfig",
"TAPQuerySetRunnerOptions",
]


class TAPQuerySetRunnerOptions(TAPBusinessOptions):
"""Options for TAPQueryRunner monkey business."""

query_set: str = Field(
"dp0.1",
title="Which query template set to use for a TapQueryRunner",
example="dp0.2",
)


class TAPQuerySetRunnerConfig(BusinessConfig):
"""Configuration specialization for TAPQuerySetRunner."""

type: Literal["TAPQuerySetRunner"] = Field(
..., title="Type of business to run"
)

options: TAPQuerySetRunnerOptions = Field(
default_factory=TAPQuerySetRunnerOptions,
title="Options for the monkey business",
)
2 changes: 2 additions & 0 deletions src/mobu/models/flock.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .business.jupyterpythonloop import JupyterPythonLoopConfig
from .business.notebookrunner import NotebookRunnerConfig
from .business.tapqueryrunner import TAPQueryRunnerConfig
from .business.tapquerysetrunner import TAPQuerySetRunnerConfig
from .monkey import MonkeyData
from .user import User, UserSpec

Expand Down Expand Up @@ -49,6 +50,7 @@ class FlockConfig(BaseModel):

business: (
TAPQueryRunnerConfig
| TAPQuerySetRunnerConfig
| NotebookRunnerConfig
| JupyterPythonLoopConfig
| EmptyLoopConfig
Expand Down
4 changes: 2 additions & 2 deletions src/mobu/models/monkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .business.base import BusinessData
from .business.notebookrunner import NotebookRunnerData
from .business.nublado import NubladoBusinessData
from .business.tapqueryrunner import TAPQueryRunnerData
from .business.tap import TAPBusinessData
from .user import AuthenticatedUser


Expand Down Expand Up @@ -38,7 +38,7 @@ class MonkeyData(BaseModel):
# to avoid the risk that Pydantic plus FastAPI will interpret a class as
# its parent class.
business: (
TAPQueryRunnerData
TAPBusinessData
| NotebookRunnerData
| NubladoBusinessData
| BusinessData
Expand Down
160 changes: 160 additions & 0 deletions src/mobu/services/business/tap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""Base class for executing TAP queries."""

from __future__ import annotations

import asyncio
from abc import ABCMeta, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from typing import Generic, TypeVar

import pyvo
import requests
from httpx import AsyncClient
from structlog.stdlib import BoundLogger

from ...config import config
from ...exceptions import CodeExecutionError, TAPClientError
from ...models.business.tap import TAPBusinessData, TAPBusinessOptions
from ...models.user import AuthenticatedUser
from .base import Business

T = TypeVar("T", bound="TAPBusinessOptions")

__all__ = ["TAPBusiness"]


class TAPBusiness(Business, Generic[T], metaclass=ABCMeta):
"""Base class for business that executes TAP query.
This class modifies the core `~mobu.business.base.Business` loop by
providing `startup`, `execute`, and `shutdown` methods that know how to
execute TAP queries. Subclasses must override `get_next_query` to return
the next query they want to execute.
Parameters
----------
options
Configuration options for the business.
user
User with their authentication token to use to run the business.
http_client
Shared HTTP client for general web access.
logger
Logger to use to report the results of business.
"""

def __init__(
self,
options: T,
user: AuthenticatedUser,
http_client: AsyncClient,
logger: BoundLogger,
) -> None:
super().__init__(options, user, http_client, logger)
self._running_query: str | None = None
self._client: pyvo.dal.TAPService | None = None
self._pool = ThreadPoolExecutor(max_workers=1)

async def startup(self) -> None:
with self.timings.start("make_client"):
self._client = self._make_client(self.user.token)

@abstractmethod
def get_next_query(self) -> str:
"""Get the next TAP query to run.
Returns
-------
str
TAP query as a string.
"""

async def execute(self) -> None:
query = self.get_next_query()
with self.timings.start("execute_query", {"query": query}) as sw:
self._running_query = query

try:
if self.options.sync:
await self.run_sync_query(query)
else:
await self.run_async_query(query)
except Exception as e:
raise CodeExecutionError(
user=self.user.username,
code=query,
code_type="TAP query",
error=f"{type(e).__name__}: {e!s}",
) from e

self._running_query = None
elapsed = sw.elapsed.total_seconds()

self.logger.info(f"Query finished after {elapsed} seconds")

async def run_async_query(self, query: str) -> None:
"""Run the query asynchronously.
Parameters
----------
query
Query string to execute.
"""
if not self._client:
raise RuntimeError("TAPBusiness startup never ran")
self.logger.info(f"Running (async): {query}")
job = self._client.submit_job(query)
try:
job.run()
while job.phase not in ("COMPLETED", "ERROR"):
await asyncio.sleep(30)
finally:
job.delete()

async def run_sync_query(self, query: str) -> None:
"""Run the query synchronously.
Parameters
----------
query
Query string to execute.
"""
if not self._client:
raise RuntimeError("TAPBusiness startup never ran")
self.logger.info(f"Running (sync): {query}")
loop = asyncio.get_event_loop()
await loop.run_in_executor(self._pool, self._client.search, query)

def dump(self) -> TAPBusinessData:
return TAPBusinessData(
running_query=self._running_query, **super().dump().dict()
)

def _make_client(self, token: str) -> pyvo.dal.TAPService:
"""Create a TAP client.
Parameters
----------
token
User authentication token.
Returns
-------
pyvo.dal.TAPService
TAP client object.
"""
if not config.environment_url:
raise RuntimeError("environment_url not set")
tap_url = str(config.environment_url).rstrip("/") + "/api/tap"
try:
s = requests.Session()
s.headers["Authorization"] = "Bearer " + token
auth = pyvo.auth.AuthSession()
auth.credentials.set("lsst-token", s)
auth.add_security_method_for_url(tap_url, "lsst-token")
auth.add_security_method_for_url(tap_url + "/sync", "lsst-token")
auth.add_security_method_for_url(tap_url + "/async", "lsst-token")
auth.add_security_method_for_url(tap_url + "/tables", "lsst-token")
return pyvo.dal.TAPService(tap_url, auth)
except Exception as e:
raise TAPClientError(e, user=self.user.username) from e
Loading

0 comments on commit 39e0fc5

Please sign in to comment.