diff --git a/packages/models-library/src/models_library/rabbitmq_messages.py b/packages/models-library/src/models_library/rabbitmq_messages.py index 612c41ea7d7..ddf4004f00e 100644 --- a/packages/models-library/src/models_library/rabbitmq_messages.py +++ b/packages/models-library/src/models_library/rabbitmq_messages.py @@ -42,7 +42,7 @@ def get_channel_name(cls) -> str: def routing_key(self) -> str | None: """this is used to define the topic of the message - :return: the topic or None (NOTE: None will implicitely use a FANOUT exchange) + :return: the topic or None (NOTE: None will implicitly use a FANOUT exchange) """ def body(self) -> bytes: diff --git a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py index 8392fb32995..7856547cc9b 100644 --- a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py @@ -84,17 +84,18 @@ async def rabbit_service( async def create_rabbitmq_client( rabbit_service: RabbitSettings, ) -> AsyncIterator[Callable[[str], RabbitMQClient]]: - created_clients = [] + created_clients: list[RabbitMQClient] = [] def _creator(client_name: str, *, heartbeat: int = 60) -> RabbitMQClient: + # pylint: disable=protected-access client = RabbitMQClient( f"pytest_{client_name}", rabbit_service, heartbeat=heartbeat ) assert client - assert client._connection_pool # pylint: disable=protected-access - assert not client._connection_pool.is_closed # pylint: disable=protected-access - assert client._channel_pool # pylint: disable=protected-access - assert not client._channel_pool.is_closed # pylint: disable=protected-access + assert client._connection_pool # noqa: SLF001 + assert not client._connection_pool.is_closed # noqa: SLF001 + assert client._channel_pool # noqa: SLF001 + assert not client._channel_pool.is_closed # noqa: SLF001 assert client.client_name == f"pytest_{client_name}" assert client.settings == rabbit_service created_clients.append(client) @@ -129,7 +130,6 @@ async def _creator(client_name: str, *, heartbeat: int = 60) -> RabbitMQRPCClien await asyncio.gather(*(client.close() for client in created_clients)) - async def rabbitmq_client(create_rabbitmq_client): # NOTE: Legacy fixture # Use create_rabbitmq_client instead of rabbitmq_client diff --git a/packages/service-library/requirements/_test.in b/packages/service-library/requirements/_test.in index 8f68374d05f..ed48e958bdd 100644 --- a/packages/service-library/requirements/_test.in +++ b/packages/service-library/requirements/_test.in @@ -17,7 +17,6 @@ coverage docker faker flaky -respx openapi-spec-validator pytest pytest-aiohttp # incompatible with pytest-asyncio. See https://github.com/pytest-dev/pytest-asyncio/issues/76 @@ -30,3 +29,4 @@ pytest-runner pytest-sugar pytest-xdist python-dotenv +respx diff --git a/packages/service-library/src/servicelib/aiohttp/monitor_slow_callbacks.py b/packages/service-library/src/servicelib/aiohttp/monitor_slow_callbacks.py index ed7f534171c..4da2374f2b2 100644 --- a/packages/service-library/src/servicelib/aiohttp/monitor_slow_callbacks.py +++ b/packages/service-library/src/servicelib/aiohttp/monitor_slow_callbacks.py @@ -19,7 +19,7 @@ def enable( from aiodebug.logging_compat import get_logger aio_debug_logger = get_logger(__name__) - _run = asyncio.events.Handle._run + _run = asyncio.events.Handle._run # noqa: SLF001 profiler = Profiler(interval=slow_duration_secs, async_mode="disabled") @@ -48,4 +48,4 @@ def instrumented(self): return retval - asyncio.events.Handle._run = instrumented # type: ignore[method-assign] + asyncio.events.Handle._run = instrumented # type: ignore[method-assign] # noqa: SLF001 diff --git a/packages/service-library/src/servicelib/rabbitmq/_client.py b/packages/service-library/src/servicelib/rabbitmq/_client.py index 96de471ed34..33502ff710c 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_client.py +++ b/packages/service-library/src/servicelib/rabbitmq/_client.py @@ -1,7 +1,8 @@ import asyncio import logging from dataclasses import dataclass, field -from typing import Final +from functools import partial +from typing import Any, Final import aio_pika from pydantic import NonNegativeInt @@ -20,6 +21,68 @@ _DEFAULT_PREFETCH_VALUE: Final[int] = 10 _DEFAULT_RABBITMQ_EXECUTION_TIMEOUT_S: Final[int] = 5 +_HEADER_X_DEATH: Final[str] = "x-death" + +_DEFAULT_UNEXPECTED_ERROR_RETRY_DELAY_S: Final[float] = 1 +_DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS: Final[NonNegativeInt] = 15 + +_DELAYED_EXCHANGE_NAME: Final[str] = "delayed_{exchange_name}" + + +def _get_x_death_count(message: aio_pika.abc.AbstractIncomingMessage) -> int: + count: int = 0 + + x_death: list[dict[str, Any]] = message.headers.get(_HEADER_X_DEATH, []) + if x_death: + count = x_death[0]["count"] + + return count + + +async def _safe_nack( + message_handler: MessageHandler, + max_retries_upon_error: int, + message: aio_pika.abc.AbstractIncomingMessage, +) -> None: + count = _get_x_death_count(message) + if count < max_retries_upon_error: + _logger.warning( + ( + "Retry [%s/%s] for handler '%s', which raised " + "an unexpected error caused by message_id='%s'" + ), + count, + max_retries_upon_error, + message_handler, + message.message_id, + ) + # NOTE: puts message to the Dead Letter Exchange + await message.nack(requeue=False) + else: + _logger.exception( + "Handler '%s' is giving up on message '%s' with body '%s'", + message_handler, + message, + message.body, + ) + + +async def _on_message( + message_handler: MessageHandler, + max_retries_upon_error: int, + message: aio_pika.abc.AbstractIncomingMessage, +) -> None: + async with message.process(requeue=True, ignore_processed=True): + try: + with log_context( + _logger, + logging.DEBUG, + msg=f"Received message from {message.exchange=}, {message.routing_key=}", + ): + if not await message_handler(message.body): + await _safe_nack(message_handler, max_retries_upon_error, message) + except Exception: # pylint: disable=broad-exception-caught + await _safe_nack(message_handler, max_retries_upon_error, message) @dataclass @@ -79,20 +142,35 @@ async def subscribe( exclusive_queue: bool = True, topics: list[str] | None = None, message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, + unexpected_error_retry_delay_s: float = _DEFAULT_UNEXPECTED_ERROR_RETRY_DELAY_S, + unexpected_error_max_attempts: int = _DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS, ) -> str: - """subscribe to exchange_name calling message_handler for every incoming message - - exclusive_queue: True means that every instance of this application will receive the incoming messages - - exclusive_queue: False means that only one instance of this application will reveice the incoming message + """subscribe to exchange_name calling ``message_handler`` for every incoming message + - exclusive_queue: True means that every instance of this application will + receive the incoming messages + - exclusive_queue: False means that only one instance of this application will + reveice the incoming message - specifying a topic will make the client declare a TOPIC type of RabbitMQ Exchange instead of FANOUT - - a FANOUT exchange transmit messages to any connected queue regardless of the routing key - - a TOPIC exchange transmit messages to any connected queue provided it is bound with the message routing key + specifying a topic will make the client declare a TOPIC type of RabbitMQ Exchange + instead of FANOUT + - a FANOUT exchange transmit messages to any connected queue regardless of + the routing key + - a TOPIC exchange transmit messages to any connected queue provided it is + bound with the message routing key - topic = BIND_TO_ALL_TOPICS ("#") is equivalent to the FANOUT effect - - a queue bound with topic "director-v2.*" will receive any message that uses a routing key such as "director-v2.event.service_started" - - a queue bound with topic "director-v2.event.specific_event" will only receive messages with that exact routing key (same as DIRECT exchanges behavior) + - a queue bound with topic "director-v2.*" will receive any message that + uses a routing key such as "director-v2.event.service_started" + - a queue bound with topic "director-v2.event.specific_event" will only + receive messages with that exact routing key (same as DIRECT exchanges behavior) + + ``unexpected_error_max_attempts`` is the maximum amount of retries when the ``message_handler`` + raised an unexpected error or it returns `False` + ``unexpected_error_retry_delay_s`` time to wait between each retry when the ``message_handler`` + raised an unexpected error or it returns `False` Raises: - aio_pika.exceptions.ChannelPreconditionFailed: In case an existing exchange with different type is used + aio_pika.exceptions.ChannelPreconditionFailed: In case an existing exchange with + different type is used Returns: queue name """ @@ -115,12 +193,17 @@ async def subscribe( # consumer/publisher must set the same configuration for same queue # exclusive means that the queue is only available for THIS very client # and will be deleted when the client disconnects + # NOTE what is a dead letter exchange, see https://www.rabbitmq.com/dlx.html + delayed_exchange_name = _DELAYED_EXCHANGE_NAME.format( + exchange_name=exchange_name + ) queue = await declare_queue( channel, self.client_name, exchange_name, exclusive_queue=exclusive_queue, message_ttl=message_ttl, + arguments={"x-dead-letter-exchange": delayed_exchange_name}, ) if topics is None: await queue.bind(exchange, routing_key="") @@ -129,29 +212,23 @@ async def subscribe( *(queue.bind(exchange, routing_key=topic) for topic in topics) ) - async def _on_message( - message: aio_pika.abc.AbstractIncomingMessage, - ) -> None: - async with message.process(requeue=True, ignore_processed=True): - try: - with log_context( - _logger, - logging.DEBUG, - msg=f"Received message from {message.exchange=}, {message.routing_key=}", - ): - if not await message_handler(message.body): - await message.nack() - except Exception: # pylint: disable=broad-exception-caught - _logger.exception( - "unhandled exception when consuming RabbitMQ message, " - "this is catched but should not happen. " - "Please check, message will be queued back!" - ) - await message.nack() + delayed_exchange = await channel.declare_exchange( + delayed_exchange_name, aio_pika.ExchangeType.FANOUT, durable=True + ) + + delayed_queue = await declare_queue( + channel, + self.client_name, + delayed_exchange_name, + exclusive_queue=exclusive_queue, + message_ttl=int(unexpected_error_retry_delay_s * 1000), + arguments={"x-dead-letter-exchange": exchange.name}, + ) + await delayed_queue.bind(delayed_exchange) _consumer_tag = await self._get_consumer_tag(exchange_name) await queue.consume( - _on_message, + partial(_on_message, message_handler, unexpected_error_max_attempts), exclusive=exclusive_queue, consumer_tag=_consumer_tag, ) @@ -169,7 +246,15 @@ async def add_topics( async with self._channel_pool.acquire() as channel: exchange = await channel.get_exchange(exchange_name) queue = await declare_queue( - channel, self.client_name, exchange_name, exclusive_queue=True + channel, + self.client_name, + exchange_name, + exclusive_queue=True, + arguments={ + "x-dead-letter-exchange": _DELAYED_EXCHANGE_NAME.format( + exchange_name=exchange_name + ) + }, ) await asyncio.gather( @@ -186,7 +271,15 @@ async def remove_topics( async with self._channel_pool.acquire() as channel: exchange = await channel.get_exchange(exchange_name) queue = await declare_queue( - channel, self.client_name, exchange_name, exclusive_queue=True + channel, + self.client_name, + exchange_name, + exclusive_queue=True, + arguments={ + "x-dead-letter-exchange": _DELAYED_EXCHANGE_NAME.format( + exchange_name=exchange_name + ) + }, ) await asyncio.gather( diff --git a/packages/service-library/src/servicelib/rabbitmq/_utils.py b/packages/service-library/src/servicelib/rabbitmq/_utils.py index afec7e908a5..54f329236ea 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_utils.py +++ b/packages/service-library/src/servicelib/rabbitmq/_utils.py @@ -1,7 +1,7 @@ import logging import os import socket -from typing import Final +from typing import Any, Final import aio_pika from pydantic import NonNegativeInt @@ -55,12 +55,16 @@ async def declare_queue( exchange_name: str, *, exclusive_queue: bool, + arguments: dict[str, Any] | None = None, message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, ) -> aio_pika.abc.AbstractRobustQueue: + default_arguments = {"x-message-ttl": message_ttl} + if arguments is not None: + default_arguments.update(arguments) queue_parameters = { "durable": True, "exclusive": exclusive_queue, - "arguments": {"x-message-ttl": message_ttl}, + "arguments": default_arguments, "name": f"{get_rabbitmq_client_unique_name(client_name)}_{exchange_name}_exclusive", } if not exclusive_queue: diff --git a/packages/service-library/tests/aiohttp/test_monitor_slow_callbacks.py b/packages/service-library/tests/aiohttp/test_monitor_slow_callbacks.py index b822ca1fce5..6cd3bfbf66a 100644 --- a/packages/service-library/tests/aiohttp/test_monitor_slow_callbacks.py +++ b/packages/service-library/tests/aiohttp/test_monitor_slow_callbacks.py @@ -1,9 +1,11 @@ -# pylint:disable=unused-variable -# pylint:disable=unused-argument +# pylint:disable=protected-access # pylint:disable=redefined-outer-name +# pylint:disable=unused-argument +# pylint:disable=unused-variable import asyncio import time +from collections.abc import Iterable import pytest from servicelib.aiohttp import monitor_slow_callbacks @@ -13,7 +15,7 @@ async def slow_task(delay): - time.sleep(delay) + time.sleep(delay) # noqa: ASYNC101 @retry(wait=wait_fixed(1), stop=stop_after_attempt(2)) @@ -22,28 +24,30 @@ async def fails_to_reach_pg_db(): @pytest.fixture -def incidents_manager(event_loop): +def incidents_manager(event_loop) -> dict: incidents = [] monitor_slow_callbacks.enable(slow_duration_secs=0.2, incidents=incidents) - f1a = asyncio.ensure_future(slow_task(0.3), loop=event_loop) - f1b = asyncio.ensure_future(slow_task(0.3), loop=event_loop) - f1c = asyncio.ensure_future(slow_task(0.4), loop=event_loop) + asyncio.ensure_future(slow_task(0.3), loop=event_loop) # noqa: RUF006 + asyncio.ensure_future(slow_task(0.3), loop=event_loop) # noqa: RUF006 + asyncio.ensure_future(slow_task(0.4), loop=event_loop) # noqa: RUF006 incidents_pg = None # aiopg_utils.monitor_pg_responsiveness.enable() - f2 = asyncio.ensure_future(fails_to_reach_pg_db(), loop=event_loop) + asyncio.ensure_future(fails_to_reach_pg_db(), loop=event_loop) # noqa: RUF006 - yield {"slow_callback": incidents, "posgres_responsive": incidents_pg} + return {"slow_callback": incidents, "posgres_responsive": incidents_pg} -async def test_slow_task_incident(incidents_manager): +@pytest.fixture +def disable_monitoring() -> Iterable[None]: + original_handler = asyncio.events.Handle._run # noqa: SLF001 + yield None + asyncio.events.Handle._run = original_handler # noqa: SLF001 + + +async def test_slow_task_incident(disable_monitoring: None, incidents_manager: dict): await asyncio.sleep(2) assert len(incidents_manager["slow_callback"]) == 3 delays = [record.delay_secs for record in incidents_manager["slow_callback"]] assert max(delays) < 0.5 - - -@pytest.mark.skip(reason="TODO: Design under development") -def test_non_responsive_incident(incidents_manager): - pass diff --git a/packages/service-library/tests/rabbitmq/test_rabbitmq.py b/packages/service-library/tests/rabbitmq/test_rabbitmq.py index b4465cd7569..6af7ce216cf 100644 --- a/packages/service-library/tests/rabbitmq/test_rabbitmq.py +++ b/packages/service-library/tests/rabbitmq/test_rabbitmq.py @@ -6,16 +6,17 @@ import asyncio -from collections.abc import Callable +from collections.abc import Awaitable, Callable from dataclasses import dataclass -from typing import Any +from typing import Any, Final from unittest import mock import aio_pika import pytest from faker import Faker from pytest_mock.plugin import MockerFixture -from servicelib.rabbitmq import BIND_TO_ALL_TOPICS, RabbitMQClient +from servicelib.rabbitmq import BIND_TO_ALL_TOPICS, RabbitMQClient, _client +from servicelib.rabbitmq._client import _DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS from settings_library.rabbit import RabbitSettings from tenacity._asyncio import AsyncRetrying from tenacity.retry import retry_if_exception_type @@ -26,6 +27,8 @@ "rabbit", ] +_ON_ERROR_DELAY_S: Final[float] = 0.1 + @pytest.fixture def rabbit_client_name(faker: Faker) -> str: @@ -79,6 +82,80 @@ def _creator(**kwargs: dict[str, Any]) -> PytestRabbitMessage: return _creator +@pytest.fixture +def on_message_spy(mocker: MockerFixture) -> mock.Mock: + return mocker.spy(_client, "_on_message") + + +def _get_spy_report(mock: mock.Mock) -> dict[str, set[int]]: + results: dict[str, set[int]] = {} + + for entry in mock.call_args_list: + message: aio_pika.abc.AbstractIncomingMessage = entry.args[2] + assert message.routing_key is not None + + if message.routing_key not in results: + results[message.routing_key] = set() + + count = _client._get_x_death_count(message) # noqa: SLF001 + results[message.routing_key].add(count) + + return results + + +async def _setup_publisher_and_subscriber( + create_rabbitmq_client: Callable[[str], RabbitMQClient], + random_exchange_name: Callable[[], str], + random_rabbit_message: Callable[..., PytestRabbitMessage], + max_requeue_retry: int, + topics: list[str] | None, + message_handler: Callable[[Any], Awaitable[bool]], +) -> int: + publisher = create_rabbitmq_client("publisher") + consumer = create_rabbitmq_client("consumer") + + exchange_name = f"{random_exchange_name()}" + + await consumer.subscribe( + exchange_name, + message_handler, + topics=topics, + exclusive_queue=False, + unexpected_error_max_attempts=max_requeue_retry, + unexpected_error_retry_delay_s=_ON_ERROR_DELAY_S, + ) + + if topics is not None: + for topic in topics: + message = random_rabbit_message(topic=topic) + await publisher.publish(exchange_name, message) + else: + message = random_rabbit_message() + await publisher.publish(exchange_name, message) + + topics_count: int = 1 if topics is None else len(topics) + return topics_count + + +async def _assert_wait_for_messages( + on_message_spy: mock.Mock, expected_results: int +) -> None: + total_seconds_to_wait = expected_results * _ON_ERROR_DELAY_S * 2 + print(f"Will wait for messages for {total_seconds_to_wait} seconds") + async for attempt in AsyncRetrying( + wait=wait_fixed(0.1), + stop=stop_after_delay(total_seconds_to_wait), + retry=retry_if_exception_type(AssertionError), + reraise=True, + ): + with attempt: + assert len(on_message_spy.call_args_list) == expected_results + + # wait some more time to make sure retry mechanism did not trigger + await asyncio.sleep(_ON_ERROR_DELAY_S * 3) + assert len(on_message_spy.call_args_list) == expected_results + + async def _assert_message_received( mocked_message_parser: mock.AsyncMock, expected_call_count: int, @@ -110,8 +187,123 @@ async def _assert_message_received( ) +_TOPICS: Final[list[list[str] | None]] = [ + None, + ["one"], + ["one", "two"], +] + + +@pytest.mark.parametrize("max_requeue_retry", [0, 1, 3, 10]) +@pytest.mark.parametrize("topics", _TOPICS) +async def test_subscribe_to_failing_message_handler( + on_message_spy: mock.Mock, + create_rabbitmq_client: Callable[[str], RabbitMQClient], + random_exchange_name: Callable[[], str], + random_rabbit_message: Callable[..., PytestRabbitMessage], + max_requeue_retry: int, + topics: list[str] | None, +): + async def _faulty_message_handler(message: Any) -> bool: + msg = f"Always fail. Received message {message}" + raise RuntimeError(msg) + + topics_count = await _setup_publisher_and_subscriber( + create_rabbitmq_client, + random_exchange_name, + random_rabbit_message, + max_requeue_retry, + topics, + _faulty_message_handler, + ) + + expected_results = (max_requeue_retry + 1) * topics_count + await _assert_wait_for_messages(on_message_spy, expected_results) + + report = _get_spy_report(on_message_spy) + routing_keys: list[str] = [""] if topics is None else topics + assert report == {k: set(range(max_requeue_retry + 1)) for k in routing_keys} + + +@pytest.mark.parametrize("topics", _TOPICS) +async def test_subscribe_fail_then_success( + on_message_spy: mock.Mock, + create_rabbitmq_client: Callable[[str], RabbitMQClient], + random_exchange_name: Callable[[], str], + random_rabbit_message: Callable[..., PytestRabbitMessage], + topics: list[str] | None, +): + message_status: dict[str, bool] = {} + + async def _fail_once_then_succeed(message: Any) -> bool: + if message not in message_status: + message_status[message] = False + if not message_status[message]: + message_status[message] = True + return False + return True + + topics_count = await _setup_publisher_and_subscriber( + create_rabbitmq_client, + random_exchange_name, + random_rabbit_message, + _DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS, + topics, + _fail_once_then_succeed, + ) + + expected_results = 2 * topics_count + await _assert_wait_for_messages(on_message_spy, expected_results) + + report = _get_spy_report(on_message_spy) + routing_keys: list[str] = [""] if topics is None else topics + assert report == {k: set(range(2)) for k in routing_keys} + + # check messages as expected + original_message_count = 0 + requeued_message_count = 0 + for entry in on_message_spy.call_args_list: + message = entry.args[2] + if message.headers == {}: + original_message_count += 1 + if message.headers and message.headers["x-death"][0]["count"] == 1: + requeued_message_count += 1 + + assert original_message_count == topics_count + assert requeued_message_count == topics_count + + +@pytest.mark.parametrize("topics", _TOPICS) +async def test_subscribe_always_returns_fails_stops( + on_message_spy: mock.Mock, + create_rabbitmq_client: Callable[[str], RabbitMQClient], + random_exchange_name: Callable[[], str], + random_rabbit_message: Callable[..., PytestRabbitMessage], + topics: list[str] | None, +): + async def _always_returning_fail(_: Any) -> bool: + return False + + topics_count = await _setup_publisher_and_subscriber( + create_rabbitmq_client, + random_exchange_name, + random_rabbit_message, + _DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS, + topics, + _always_returning_fail, + ) + + expected_results = (_DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS + 1) * topics_count + await _assert_wait_for_messages(on_message_spy, expected_results) + + report = _get_spy_report(on_message_spy) + routing_keys: list[str] = [""] if topics is None else topics + assert report == { + k: set(range(_DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS + 1)) for k in routing_keys + } + + async def test_rabbit_client_pub_sub_message_is_lost_if_no_consumer_present( - cleanup_check_rabbitmq_server_has_no_errors: None, create_rabbitmq_client: Callable[[str], RabbitMQClient], random_exchange_name: Callable[[], str], mocked_message_parser: mock.AsyncMock, @@ -129,7 +321,6 @@ async def test_rabbit_client_pub_sub_message_is_lost_if_no_consumer_present( async def test_rabbit_client_pub_sub( - cleanup_check_rabbitmq_server_has_no_errors: None, create_rabbitmq_client: Callable[[str], RabbitMQClient], random_exchange_name: Callable[[], str], mocked_message_parser: mock.AsyncMock, @@ -147,7 +338,6 @@ async def test_rabbit_client_pub_sub( @pytest.mark.parametrize("num_subs", [10]) async def test_rabbit_client_pub_many_subs( - cleanup_check_rabbitmq_server_has_no_errors: None, create_rabbitmq_client: Callable[[str], RabbitMQClient], random_exchange_name: Callable[[], str], mocker: MockerFixture, @@ -179,7 +369,6 @@ async def test_rabbit_client_pub_many_subs( async def test_rabbit_client_pub_sub_republishes_if_exception_raised( - cleanup_check_rabbitmq_server_has_no_errors: None, create_rabbitmq_client: Callable[[str], RabbitMQClient], random_exchange_name: Callable[[], str], mocked_message_parser: mock.AsyncMock, @@ -210,7 +399,6 @@ def _raise_once_then_true(*args, **kwargs): @pytest.mark.parametrize("num_subs", [10]) async def test_pub_sub_with_non_exclusive_queue( - cleanup_check_rabbitmq_server_has_no_errors: None, create_rabbitmq_client: Callable[[str], RabbitMQClient], random_exchange_name: Callable[[], str], mocker: MockerFixture, @@ -275,7 +463,6 @@ def run_test_async(): async def test_rabbit_pub_sub_with_topic( - cleanup_check_rabbitmq_server_has_no_errors: None, create_rabbitmq_client: Callable[[str], RabbitMQClient], random_exchange_name: Callable[[], str], mocker: MockerFixture, @@ -330,7 +517,6 @@ async def test_rabbit_pub_sub_with_topic( async def test_rabbit_pub_sub_bind_and_unbind_topics( - cleanup_check_rabbitmq_server_has_no_errors: None, create_rabbitmq_client: Callable[[str], RabbitMQClient], random_exchange_name: Callable[[], str], mocked_message_parser: mock.AsyncMock, @@ -400,9 +586,7 @@ async def test_rabbit_pub_sub_bind_and_unbind_topics( await _assert_message_received(mocked_message_parser, 0) -@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors() async def test_rabbit_adding_topics_to_a_fanout_exchange( - cleanup_check_rabbitmq_server_has_no_errors: None, create_rabbitmq_client: Callable[[str], RabbitMQClient], random_exchange_name: Callable[[], str], mocked_message_parser: mock.AsyncMock, diff --git a/packages/service-library/tests/rabbitmq/test_rabbitmq_connection.py b/packages/service-library/tests/rabbitmq/test_rabbitmq_connection.py index e7d6c6dfea8..fe69986af30 100644 --- a/packages/service-library/tests/rabbitmq/test_rabbitmq_connection.py +++ b/packages/service-library/tests/rabbitmq/test_rabbitmq_connection.py @@ -59,7 +59,6 @@ async def async_docker_client() -> AsyncIterator[aiodocker.Docker]: async def test_rabbit_client_lose_connection( async_docker_client: aiodocker.Docker, - cleanup_check_rabbitmq_server_has_no_errors: None, create_rabbitmq_client: Callable[[str], RabbitMQClient], docker_client: docker.client.DockerClient, ): diff --git a/packages/service-library/tests/rabbitmq/test_rabbitmq_utils.py b/packages/service-library/tests/rabbitmq/test_rabbitmq_utils.py index 8f57aaeedc2..b07f8e8cb8d 100644 --- a/packages/service-library/tests/rabbitmq/test_rabbitmq_utils.py +++ b/packages/service-library/tests/rabbitmq/test_rabbitmq_utils.py @@ -10,28 +10,33 @@ ({"hello": "1", "b": "2"}, "b_2-hello_1"), ], ) +@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors() # no rabbitmq instance running def test_rpc_namespace_from_entries(entries: dict[str, str], expected: str): assert RPCNamespace.from_entries(entries) == expected +@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors() # no rabbitmq instance running def test_rpc_namespace_sorts_elements(): assert RPCNamespace.from_entries({"1": "a", "2": "b"}) == RPCNamespace.from_entries( {"2": "b", "1": "a"} ) +@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors() # no rabbitmq instance running def test_rpc_namespace_too_long(): with pytest.raises(ValidationError) as exec_info: RPCNamespace.from_entries({f"test{i}": f"test{i}" for i in range(20)}) assert "ensure this value has at most 252 characters" in f"{exec_info.value}" +@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors() # no rabbitmq instance running def test_rpc_namespace_too_short(): with pytest.raises(ValidationError) as exec_info: RPCNamespace.from_entries({}) assert "ensure this value has at least 1 characters" in f"{exec_info.value}" +@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors() # no rabbitmq instance running def test_rpc_namespace_invalid_symbols(): with pytest.raises(ValidationError) as exec_info: RPCNamespace.from_entries({"test": "@"})