Skip to content

Commit

Permalink
Merge pull request #408 from andrewwhitehead/feature/outbound-context
Browse files Browse the repository at this point in the history
Make context available in outbound transport handler
  • Loading branch information
andrewwhitehead authored Mar 13, 2020
2 parents fc72ccc + fe7a977 commit 592c431
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 15 deletions.
6 changes: 5 additions & 1 deletion aries_cloudagent/transport/outbound/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from abc import ABC, abstractmethod
from typing import Union

from ...config.injection_context import InjectionContext
from ...utils.stats import Collector

from ..error import TransportError
Expand Down Expand Up @@ -57,11 +58,14 @@ def wire_format(self, format: BaseWireFormat):
self._wire_format = format

@abstractmethod
async def handle_message(self, payload: Union[str, bytes], endpoint: str):
async def handle_message(
self, context: InjectionContext, payload: Union[str, bytes], endpoint: str
):
"""
Handle message from queue.
Args:
context: the context that produced the message
payload: message payload in string or byte format
endpoint: URI endpoint for delivery
"""
Expand Down
10 changes: 8 additions & 2 deletions aries_cloudagent/transport/outbound/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from aiohttp import ClientSession, DummyCookieJar, TCPConnector

from ...config.injection_context import InjectionContext

from ..stats import StatsTracer

from .base import BaseOutboundTransport, OutboundTransportError
Expand Down Expand Up @@ -40,12 +42,16 @@ async def stop(self):
await self.client_session.close()
self.client_session = None

async def handle_message(self, payload: Union[str, bytes], endpoint: str):
async def handle_message(
self, context: InjectionContext, payload: Union[str, bytes], endpoint: str
):
"""
Handle message from queue.
Args:
message: `OutboundMessage` to send over transport implementation
context: the context that produced the message
payload: message payload in string or byte format
endpoint: URI endpoint for delivery
"""
if not endpoint:
raise OutboundTransportError("No endpoint provided")
Expand Down
7 changes: 3 additions & 4 deletions aries_cloudagent/transport/outbound/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def deliver_queued_message(self, queued: QueuedOutboundMessage) -> asyncio.Task:
"""Kick off delivery of a queued message."""
transport = self.get_transport_instance(queued.transport_id)
queued.task = self.task_queue.run(
transport.handle_message(queued.payload, queued.endpoint),
transport.handle_message(queued.context, queued.payload, queued.endpoint),
lambda completed: self.finished_deliver(queued, completed),
)
return queued.task
Expand All @@ -411,15 +411,14 @@ def finished_deliver(self, queued: QueuedOutboundMessage, completed: CompletedTa
if queued.retries:
LOGGER.error(
">>> Posting error: %s; Re-queue failed message ...",
queued.endpoint
queued.endpoint,
)
queued.retries -= 1
queued.state = QueuedOutboundMessage.STATE_RETRY
queued.retry_at = time.perf_counter() + 10
else:
LOGGER.exception(
"Outbound message could not be delivered",
exc_info=queued.error,
"Outbound message could not be delivered", exc_info=queued.error,
)
LOGGER.error(">>> NOT Re-queued, state is DONE, failed to deliver msg.")
queued.state = QueuedOutboundMessage.STATE_DONE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop
from aiohttp import web

from ....config.injection_context import InjectionContext
from ....utils.stats import Collector

from ...outbound.message import OutboundMessage
Expand All @@ -12,6 +13,7 @@

class TestHttpTransport(AioHTTPTestCase):
async def setUpAsync(self):
self.context = InjectionContext()
self.message_results = []

async def receive_message(self, request):
Expand All @@ -33,7 +35,7 @@ async def test_handle_message(self):

async def send_message(transport, payload, endpoint):
async with transport:
await transport.handle_message(payload, endpoint)
await transport.handle_message(self.context, payload, endpoint)

transport = HttpTransport()
await asyncio.wait_for(send_message(transport, "{}", endpoint=server_addr), 5.0)
Expand All @@ -45,7 +47,7 @@ async def test_stats(self):

async def send_message(transport, payload, endpoint):
async with transport:
await transport.handle_message(payload, endpoint)
await transport.handle_message(self.context, payload, endpoint)

transport = HttpTransport()
transport.collector = Collector()
Expand Down
9 changes: 6 additions & 3 deletions aries_cloudagent/transport/outbound/tests/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,20 @@ async def test_send_message(self):
sender_key=4,
)

mgr.enqueue_message(context, message)
send_context = InjectionContext()
mgr.enqueue_message(send_context, message)
await mgr.flush()
transport.wire_format.encode_message.assert_awaited_once_with(
context,
send_context,
message.payload,
message.target.recipient_keys,
message.target.routing_keys,
message.target.sender_key,
)
transport.handle_message.assert_awaited_once_with(
transport.wire_format.encode_message.return_value, message.target.endpoint
send_context,
transport.wire_format.encode_message.return_value,
message.target.endpoint,
)
await mgr.stop()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop
from aiohttp import web, WSMsgType

from ....config.injection_context import InjectionContext

from ..ws import WsTransport


class TestWsTransport(AioHTTPTestCase):
async def setUpAsync(self):
self.context = InjectionContext()
self.message_results = []

async def receive_message(self, request):
Expand Down Expand Up @@ -38,7 +41,7 @@ async def test_handle_message(self):

async def send_message(transport, payload, endpoint: str):
async with transport:
await transport.handle_message(payload, endpoint)
await transport.handle_message(self.context, payload, endpoint)

transport = WsTransport()
await asyncio.wait_for(send_message(transport, "{}", endpoint=server_addr), 5.0)
Expand Down
10 changes: 8 additions & 2 deletions aries_cloudagent/transport/outbound/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from aiohttp import ClientSession, DummyCookieJar

from ...config.injection_context import InjectionContext

from .base import BaseOutboundTransport


Expand All @@ -28,12 +30,16 @@ async def stop(self):
await self.client_session.close()
self.client_session = None

async def handle_message(self, payload: Union[str, bytes], endpoint: str):
async def handle_message(
self, context: InjectionContext, payload: Union[str, bytes], endpoint: str
):
"""
Handle message from queue.
Args:
message: `OutboundMessage` to send over transport implementation
context: the context that produced the message
payload: message payload in string or byte format
endpoint: URI endpoint for delivery
"""
# aiohttp should automatically handle websocket sessions
async with self.client_session.ws_connect(endpoint) as ws:
Expand Down

0 comments on commit 592c431

Please sign in to comment.