Skip to content

Commit

Permalink
feat: various usability fixes
Browse files Browse the repository at this point in the history
feat: Moves into a builder function for CLI and controller
feat: jobs can be configured with list of strings as well
feat: plugin has a get_queues function
feat: allow worker to schedule in running event loop
  • Loading branch information
cofin authored Oct 13, 2023
2 parents 46f7cd5 + 2067ad9 commit fcc6a38
Show file tree
Hide file tree
Showing 13 changed files with 343 additions and 467 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,4 @@ cython_debug/
#.idea/

.vscode/
examples/tmp.py
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ repos:
- id: conventional-pre-commit
stages: [commit-msg]
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
rev: v4.5.0
hooks:
- id: check-ast
- id: check-case-conflict
Expand All @@ -33,7 +33,7 @@ repos:
- id: black
args: [--config=./pyproject.toml]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: "v1.5.1"
rev: "v1.6.0"
hooks:
- id: mypy
exclude: "docs"
Expand Down
9 changes: 2 additions & 7 deletions litestar_saq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
from __future__ import annotations

from . import controllers, info
from .base import CronJob, Job, Queue, Worker
from .config import QueueConfig, SAQConfig
from .controllers import SAQController
from .plugin import SAQPlugin

__all__ = [
__all__ = (
"SAQPlugin",
"SAQConfig",
"SAQController",
"QueueConfig",
"Queue",
"CronJob",
"Job",
"Worker",
"info",
"controllers",
]
)
File renamed without changes.
26 changes: 14 additions & 12 deletions litestar_saq/base.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from __future__ import annotations

import asyncio
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, cast

from saq import Job as SaqJob
from saq import Worker as SaqWorker
from saq.job import CronJob as SaqCronJob
from saq.queue import Queue as SaqQueue

from litestar_saq._util import import_string

if TYPE_CHECKING:
from collections.abc import Collection
from signal import Signals
Expand All @@ -21,16 +23,21 @@
class Job(SaqJob):
"""Job Details"""

job_name: str | None = None
job_description: str | None = None


@dataclass
class CronJob(SaqCronJob):
"""Cron Job Details"""

job_name: str | None = None
job_description: str | None = None
meta: dict[str, Any] = field(default_factory=dict)

def __post_init__(self) -> None:
self.function = self._get_or_import_function(self.function)

@staticmethod
def _get_or_import_function(function_or_import_string: str | Function) -> Function:
if isinstance(function_or_import_string, str):
return cast("Function", import_string(function_or_import_string))
return function_or_import_string


class Queue(SaqQueue):
Expand Down Expand Up @@ -58,11 +65,6 @@ def __init__(
self._namespace = queue_namespace if queue_namespace is not None else "saq"
super().__init__(redis, name, dump, load, max_concurrent_ops)

def temp(self, *args: Any, **kwargs: Any) -> None:
"""Initialize a new queue."""
self._namespace = kwargs.pop("queue_namespace", "saq")
super().__init__(*args, **kwargs)

def namespace(self, key: str) -> str:
"""Make the namespace unique per app."""
return f"{self._namespace}:{self.name}:{key}"
Expand Down
127 changes: 69 additions & 58 deletions litestar_saq/cli.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,72 @@
from __future__ import annotations

import asyncio
import multiprocessing
from contextlib import suppress
from typing import TYPE_CHECKING, cast

from click import IntRange, group, option
from litestar.cli._utils import LitestarGroup, _format_is_enabled, console
from rich.table import Table
from saq import __version__ as saq_version

from litestar_saq.exceptions import ImproperConfigurationError
from litestar_saq.plugin import SAQPlugin
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from click import Group
from litestar import Litestar
from litestar.logging.config import BaseLoggingConfig

from litestar_saq.base import Worker
from litestar_saq.plugin import SAQPlugin


@group(cls=LitestarGroup, name="tasks")
def background_worker_group() -> None:
"""Manage background task workers."""


@background_worker_group.command(
name="run-worker",
help="Run background worker processes.",
)
@option(
"--workers",
help="The number of worker processes to start.",
type=IntRange(min=1),
default=1,
required=False,
show_default=True,
)
@option("-v", "--verbose", help="Enable verbose logging.", is_flag=True, default=None, type=bool, required=False)
@option("-d", "--debug", help="Enable debugging.", is_flag=True, default=None, type=bool, required=False)
def run_worker(
app: Litestar,
workers: int,
verbose: bool | None,
debug: bool | None,
) -> None:
"""Run the API server."""
console.rule("[yellow]Starting SAQ Workers[/]", align="left")
if app.logging_config is not None:
app.logging_config.configure()
if debug is not None:
app.debug = True
if verbose is not None:
"""todo: set the logging level here"""
_log_level = "debug"
plugin = get_saq_plugin(app)
show_saq_info(app, workers, plugin)
if workers > 1:
for _ in range(workers - 1):
p = multiprocessing.Process(target=run_worker_process, args=(plugin.get_workers(), app.logging_config))
p.start()
def build_cli_app() -> Group:
import asyncio
import multiprocessing
from typing import cast

try:
run_worker_process(workers=plugin.get_workers(), logging_config=cast("BaseLoggingConfig", app.logging_config))
except KeyboardInterrupt:
loop = asyncio.get_event_loop()
for worker_instance in plugin.get_workers():
loop.run_until_complete(worker_instance.stop())
from click import IntRange, group, option
from litestar.cli._utils import LitestarGroup, console

@group(cls=LitestarGroup, name="workers")
def background_worker_group() -> None:
"""Manage background task workers."""

@background_worker_group.command(
name="run",
help="Run background worker processes.",
)
@option(
"--workers",
help="The number of worker processes to start.",
type=IntRange(min=1),
default=1,
required=False,
show_default=True,
)
@option("-v", "--verbose", help="Enable verbose logging.", is_flag=True, default=None, type=bool, required=False)
@option("-d", "--debug", help="Enable debugging.", is_flag=True, default=None, type=bool, required=False)
def run_worker(
app: Litestar,
workers: int,
verbose: bool | None,
debug: bool | None,
) -> None:
"""Run the API server."""
console.rule("[yellow]Starting SAQ Workers[/]", align="left")
if app.logging_config is not None:
app.logging_config.configure()
if debug is not None or verbose is not None:
app.debug = True
plugin = get_saq_plugin(app)
show_saq_info(app, workers, plugin)
if workers > 1:
for _ in range(workers - 1):
p = multiprocessing.Process(target=run_worker_process, args=(plugin.get_workers(), app.logging_config))
p.start()

try:
run_worker_process(
workers=plugin.get_workers(),
logging_config=cast("BaseLoggingConfig", app.logging_config),
)
except KeyboardInterrupt:
loop = asyncio.get_event_loop()
for worker_instance in plugin.get_workers():
loop.run_until_complete(worker_instance.stop())

return background_worker_group


def get_saq_plugin(app: Litestar) -> SAQPlugin:
Expand All @@ -76,6 +76,11 @@ def get_saq_plugin(app: Litestar) -> SAQPlugin:
If plugin is not found, it raises an ImproperlyConfiguredException.
"""

from contextlib import suppress

from litestar_saq.exceptions import ImproperConfigurationError
from litestar_saq.plugin import SAQPlugin

with suppress(KeyError):
return app.plugins.get(SAQPlugin)
msg = "Failed to initialize SAQ. The required plugin (SAQPlugin) is missing."
Expand All @@ -87,6 +92,10 @@ def get_saq_plugin(app: Litestar) -> SAQPlugin:
def show_saq_info(app: Litestar, workers: int, plugin: SAQPlugin) -> None: # pragma: no cover
"""Display basic information about the application and its configuration."""

from litestar.cli._utils import _format_is_enabled, console
from rich.table import Table
from saq import __version__ as saq_version

table = Table(show_header=False)
table.add_column("title", style="cyan")
table.add_column("value", style="bright_blue")
Expand All @@ -101,6 +110,8 @@ def show_saq_info(app: Litestar, workers: int, plugin: SAQPlugin) -> None: # pr

def run_worker_process(workers: list[Worker], logging_config: BaseLoggingConfig | None) -> None:
"""Run a worker."""
import asyncio

loop = asyncio.get_event_loop()
if logging_config is not None:
logging_config.configure()
Expand Down
50 changes: 43 additions & 7 deletions litestar_saq/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
from litestar.exceptions import ImproperlyConfiguredException
from litestar.serialization import decode_json, encode_json
from redis.asyncio import ConnectionPool, Redis
from saq.queue import Queue as SaqQueue
from saq.types import DumpType, LoadType, PartialTimersDict, QueueInfo, QueueStats, ReceivesContext

from litestar_saq._util import import_string, module_to_os_path
from litestar_saq.base import CronJob, Job, Queue, Worker
from litestar_saq.util import module_to_os_path

if TYPE_CHECKING:
from typing import Any
Expand All @@ -37,6 +38,22 @@ def _get_static_files() -> Path:
return Path(module_to_os_path("saq.web") / "static")


TaskQueue = SaqQueue | Queue


@dataclass
class TaskQueues:
__slots__ = ("_queues",)
_queues: dict[str, TaskQueue] = field(default_factory=dict)

def get(self, name: str) -> TaskQueue:
queue = self._queues.get(name)
if queue is not None:
return queue
msg = "Could not find the specified queue. Please check your configuration."
raise ImproperlyConfiguredException(msg)


@dataclass
class SAQConfig:
"""SAQ Configuration."""
Expand All @@ -49,7 +66,7 @@ class SAQConfig:
"""Redis URL to connect with."""
namespace: str = "saq"
"""Namespace to use for Redis"""
queue_instances: dict[str, Queue] | None = None
queue_instances: dict[str, Queue | SaqQueue] | None = None
"""Current configured queue instances. When None, queues will be auto-created on startup"""
queues_dependency_key: str = field(default="task_queues")
"""Key to use for storing dependency information in litestar."""
Expand Down Expand Up @@ -84,7 +101,14 @@ def signature_namespace(self) -> dict[str, Any]:
Returns:
A string keyed dict of names to be added to the namespace for signature forward reference resolution.
"""
return {"Queue": Queue, "Worker": Worker, "QueueInfo": QueueInfo, "Job": Job, "QueueStats": QueueStats}
return {
"Queue": Queue,
"Worker": Worker,
"QueueInfo": QueueInfo,
"Job": Job,
"QueueStats": QueueStats,
"TaskQueues": TaskQueues,
}

async def on_shutdown(self, app: Litestar) -> None:
"""Disposes of the SAQ Workers.
Expand Down Expand Up @@ -125,14 +149,14 @@ def get_redis(self) -> Redis:
self.redis = Redis(connection_pool=pool)
return self.redis

def get_queues(self) -> dict[str, Queue]:
def get_queues(self) -> TaskQueues:
"""Get the configured SAQ queues.
Returns:
Dictionary of queues.
"""
if self.queue_instances is not None:
return self.queue_instances
return TaskQueues(_queues=self.queue_instances)
self.queue_instances = {}
for queue_config in self.queue_configs:
self.queue_instances[queue_config.name] = Queue(
Expand All @@ -143,7 +167,7 @@ def get_queues(self) -> dict[str, Queue]:
load=self.json_deserializer,
max_concurrent_ops=queue_config.max_concurrent_ops,
)
return self.queue_instances
return TaskQueues(_queues=self.queue_instances)

def create_app_state_items(self) -> dict[str, Any]:
"""Key/value pairs to be stored in application state."""
Expand All @@ -170,7 +194,7 @@ class QueueConfig:
"""The name of the queue to create."""
concurrency: int = 10
"""Number of jobs to process concurrently"""
max_concurrent_ops: int = 20
max_concurrent_ops: int = 15
"""Maximum concurrent operations. (default 20)
This throttles calls to `enqueue`, `job`, and `abort` to prevent the Queue
from consuming too many Redis connections."""
Expand All @@ -194,3 +218,15 @@ class QueueConfig:
abort: how often to check if a job is aborted"""
dequeue_timeout: float = 0
"""How long it will wait to dequeue"""
separate_process: bool = True
"""Executes as a separate event loop when True.
Set it False to execute within the Litestar application."""

def __post_init__(self) -> None:
self.tasks = [self._get_or_import_task(task) for task in self.tasks]

@staticmethod
def _get_or_import_task(task_or_import_string: str | ReceivesContext) -> ReceivesContext:
if isinstance(task_or_import_string, str):
return cast("ReceivesContext", import_string(task_or_import_string))
return task_or_import_string
Loading

0 comments on commit fcc6a38

Please sign in to comment.