Skip to content

Commit

Permalink
log exceptions in channel reader
Browse files Browse the repository at this point in the history
Also, fix obvious error in connection.py (self.reader check),
more realistic rpc example and __close_reply_text in channel.py
  • Loading branch information
horpto committed Jan 30, 2022
1 parent 4a2f75d commit 5c28fd6
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 32 deletions.
60 changes: 36 additions & 24 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ Simple consumer
# Declaring queue
declare_ok = await channel.queue_declare('helo')
consume_ok = await channel.basic_consume(
declare_ok.queue, on_message, no_ack=True
declare_ok.queue, on_message, no_ack=True,
)
print("Consumer started")
loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -544,6 +545,8 @@ RPC server
.. code-block:: python
import asyncio
import functools
from contextlib import suppress
import aiormq
import aiormq.types
Expand All @@ -557,46 +560,55 @@ RPC server
return fib(n-1) + fib(n-2)
async def on_message(message:aiormq.types.DeliveredMessage):
n = int(message.body.decode())
async def on_message(num, message: aiormq.types.DeliveredMessage):
try:
n = int(message.body.decode())
print(" [.] fib(%d)" % n)
response = str(fib(n)).encode()
print(" [.] fib(%d) from %s channel" % (n, num)
await message.channel.basic_publish(
response, routing_key=message.header.properties.reply_to,
properties=aiormq.spec.Basic.Properties(
correlation_id=message.header.properties.correlation_id
),
fib_n = await asyncio.run_in_executor(None, fib, n)
response = str(fib(n)).encode()
)
await message.channel.basic_publish(
response, routing_key=message.header.properties.reply_to,
properties=aiormq.spec.Basic.Properties(
correlation_id=message.header.properties.correlation_id
),
await message.channel.basic_ack(message.delivery.delivery_tag)
print('Request complete')
)
except Exception as exc:
print('Message exception:', exc)
finally:
await message.channel.basic_ack(message.delivery.delivery_tag)
print(f'Request from {num} channel complete')
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
for i in range(10):
# Creating a channel
channel = await connection.channel()
# Declaring queue
declare_ok = await channel.queue_declare('rpc_queue')
# Declaring queue
declare_ok = await channel.queue_declare('rpc_queue')
# Start listening the queue with name 'hello'
await channel.basic_consume(declare_ok.queue, on_message)
# Start listening the queue with name 'hello'
# with several consumers simultaneously
on_message = functools.partial(on_message, i)
await channel.basic_consume(declare_ok.queue, on_message)
print(f'Consumer {i} started')
print(" [x] Awaiting RPC requests")
with suppress(asyncio.CancelledError):
await connection.closing
# waiting for the connections closing to release recources
loop = asyncio.get_event_loop()
loop.create_task(main())
# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(" [x] Awaiting RPC requests")
loop.run_forever()
RPC client
**********
Expand Down
12 changes: 7 additions & 5 deletions aiormq/channel.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import asyncio
import logging
import os
from binascii import hexlify
from collections import OrderedDict
from contextlib import suppress
from functools import partial
from io import BytesIO
from random import getrandbits
from types import MappingProxyType
from typing import (
Any, Awaitable, Dict, Generator, Mapping, Optional, Set, Type, Union,
Any, Dict, Generator, Mapping, Optional, Set, Type, Union,
)
from uuid import UUID

Expand Down Expand Up @@ -260,7 +258,10 @@ async def _on_deliver(self, frame: spec.Basic.Deliver) -> None:
consumer = self.consumers.get(frame.consumer_tag)
if consumer is not None:
# noinspection PyAsyncCall
self.create_task(consumer(message))
try:
self.create_task(consumer(message))
except Exception:
log.exception('Unhandled consumer exception')

async def _on_get(
self, frame: Union[spec.Basic.GetOk, spec.Basic.GetEmpty]
Expand Down Expand Up @@ -414,14 +415,15 @@ async def _reader(self) -> None:
except asyncio.CancelledError:
return
except Exception as e: # pragma: nocover
log.debug("Channel reader exception %r", exc_info=e)
log.exception("Channel reader exception")
await self._cancel_tasks(e)
raise

async def _on_close(self, exc: Optional[ExceptionType] = None) -> None:
await self.rpc(
spec.Channel.Close(
reply_code=self.__close_reply_code,
reply_text=self.__close_reply_text,
class_id=self.__close_class_id,
method_id=self.__close_method_id,
),
Expand Down
6 changes: 3 additions & 3 deletions aiormq/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,16 @@ async def get_frame(self) -> ReceivedFrame:

async with self.lock:
try:
if self.reader is None:
raise ConnectionError

frame_header = await countdown(self.reader.readexactly(1))

if frame_header == b"\0x00":
raise AMQPFrameError(
await countdown(self.reader.read()),
)

if self.reader is None:
raise ConnectionError

frame_header += await countdown(self.reader.readexactly(6))

if not self.started and frame_header.startswith(b"AMQP"):
Expand Down

0 comments on commit 5c28fd6

Please sign in to comment.