Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added new base class for exceptions, added templates #393

Merged
merged 7 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pytz = "*"
orjson = { version = "^3", optional = true }
msgpack = { version = "^1.0.7", optional = true }
cbor2 = { version = "^5", optional = true }
izulu = "0.5.4"

[tool.poetry.dev-dependencies]
pytest = "^7.1.2"
Expand Down
4 changes: 2 additions & 2 deletions taskiq/brokers/inmemory_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from taskiq.abc.result_backend import AsyncResultBackend, TaskiqResult
from taskiq.depends.progress_tracker import TaskProgress
from taskiq.events import TaskiqEvents
from taskiq.exceptions import TaskiqError
from taskiq.exceptions import UnknownTaskError
from taskiq.message import BrokerMessage
from taskiq.receiver import Receiver
from taskiq.utils import maybe_awaitable
Expand Down Expand Up @@ -156,7 +156,7 @@
"""
target_task = self.find_task(message.task_name)
if target_task is None:
raise TaskiqError("Unknown task.")
raise UnknownTaskError(task_name=message.task_name)

Check warning on line 159 in taskiq/brokers/inmemory_broker.py

View check run for this annotation

Codecov / codecov/patch

taskiq/brokers/inmemory_broker.py#L159

Added line #L159 was not covered by tests

receiver_cb = self.receiver.callback(message=message.message)
if self.await_inplace:
Expand Down
9 changes: 3 additions & 6 deletions taskiq/brokers/shared_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from taskiq.abc.broker import AsyncBroker
from taskiq.decor import AsyncTaskiqDecoratedTask
from taskiq.exceptions import TaskiqError
from taskiq.exceptions import SharedBrokerListenError, SharedBrokerSendTaskError
from taskiq.kicker import AsyncKicker
from taskiq.message import BrokerMessage

Expand Down Expand Up @@ -56,10 +56,7 @@
:param message: message to send.
:raises TaskiqError: if called.
"""
raise TaskiqError(
"You cannot use kiq directly on shared task "
"without setting the default_broker.",
)
raise SharedBrokerSendTaskError

Check warning on line 59 in taskiq/brokers/shared_broker.py

View check run for this annotation

Codecov / codecov/patch

taskiq/brokers/shared_broker.py#L59

Added line #L59 was not covered by tests

async def listen(self) -> AsyncGenerator[bytes, None]: # type: ignore
"""
Expand All @@ -69,7 +66,7 @@

:raises TaskiqError: if called.
"""
raise TaskiqError("Shared broker cannot listen")
raise SharedBrokerListenError

Check warning on line 69 in taskiq/brokers/shared_broker.py

View check run for this annotation

Codecov / codecov/patch

taskiq/brokers/shared_broker.py#L69

Added line #L69 was not covered by tests

def _register_task(
self,
Expand Down
59 changes: 58 additions & 1 deletion taskiq/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,103 @@
class TaskiqError(Exception):
from typing import Optional

from izulu import root


class TaskiqError(root.Error):
"""Base exception for all errors."""

__template__ = "Exception occurred"


class TaskiqResultTimeoutError(TaskiqError):
"""Waiting for task results has timed out."""

__template__ = "Waiting for task results has timed out, timeout={timeout}"
timeout: Optional[float] = None


class BrokerError(TaskiqError):
"""Base class for all broker errors."""

__template__ = "Base exception for all broker errors"


class ListenError(TaskiqError):
"""Error if the broker is unable to listen to the queue."""


class SharedBrokerListenError(ListenError):
"""Error when someone tries to listen to the queue with shared broker."""

__template__ = "Shared broker cannot listen"


class SendTaskError(BrokerError):
"""Error if the broker was unable to send the task to the queue."""

__template__ = "Cannot send task to the queue"


class SharedBrokerSendTaskError(SendTaskError):
"""Error when someone tries to send task with shared broker."""

__template__ = (
"You cannot use kiq directly on shared task "
"without setting the default_broker."
)


class UnknownTaskError(SendTaskError):
"""Error if task is unknown."""

__template__ = "Cannot send unknown task to the queue, task name - {task_name}"
task_name: str


class ResultBackendError(TaskiqError):
"""Base class for all ResultBackend errors."""

__template__ = "Base exception for all result backend errors"


class ResultGetError(ResultBackendError):
"""Error if ResultBackend was unable to get result."""

__template__ = "Cannot get result for the task"


class ResultSetError(ResultBackendError):
"""Error if ResultBackend was unable to set result."""

__template__ = "Cannot set result for the task"


class ResultIsReadyError(ResultBackendError):
"""Error if ResultBackend was unable to find out if the task is ready."""

__template__ = "Cannot find out if the task is ready"


class SecurityError(TaskiqError):
"""Security related exception."""

__template__ = "Security exception occurred: {description}"
description: str


class NoResultError(TaskiqError):
"""Error if user does not want to set result."""

__template__ = "User doesn't want to set result"


class TaskRejectedError(TaskiqError):
"""Task was rejected."""

__template__ = "Task was rejected"


class ScheduledTaskCancelledError(TaskiqError):
"""Scheduled task was cancelled and not sent to the queue."""

__template__ = "Cannot send scheduled task to the queue."
2 changes: 1 addition & 1 deletion taskiq/funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async def check_task(task: AsyncTaskiqTask[Any]) -> None:

while task_ids:
if 0 < timeout < time() - start_time:
raise TaskiqResultTimeoutError("Timed out")
raise TaskiqResultTimeoutError(timeout=timeout)
check_tasks = []
for task in tasks:
check_tasks.append(loop.create_task(check_task(task)))
Expand Down
4 changes: 3 additions & 1 deletion taskiq/receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,9 @@ async def prefetcher(
"""
fetched_tasks: int = 0
iterator = self.broker.listen()
current_message: asyncio.Task[bytes | AckableMessage] = asyncio.create_task(
current_message: asyncio.Task[
Union[bytes, AckableMessage]
] = asyncio.create_task(
iterator.__anext__(), # type: ignore
)

Expand Down
5 changes: 4 additions & 1 deletion taskiq/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,10 @@ def exception_to_python(
if not isinstance(cls, type) or not issubclass(cls, BaseException):
fake_exc_type = exc_type if exc_module is None else f"{exc_module}.{exc_type}"
raise taskiq.exceptions.SecurityError(
f"Expected an exception class, got {fake_exc_type} with payload {exc_msg}",
description=(
f"Expected an exception class, "
f"got {fake_exc_type} with payload {exc_msg}"
),
)

# XXX: Without verifying `cls` is actually an exception class,
Expand Down
2 changes: 1 addition & 1 deletion taskiq/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async def wait_result(
while not await self.is_ready():
await asyncio.sleep(check_interval)
if 0 < timeout < time() - start_time:
raise TaskiqResultTimeoutError
raise TaskiqResultTimeoutError(timeout=timeout)
return await self.get_result(with_logs=with_logs)

async def get_progress(self) -> "Optional[TaskProgress[Any]]":
Expand Down
Loading