From 7deadc8b077249ea55767e2c1d4b24cfb693632a Mon Sep 17 00:00:00 2001 From: Daniel Bluhm Date: Fri, 3 Mar 2023 09:12:43 -0500 Subject: [PATCH] feat: get queued outbound message in transport handle message This will enable queues to annotate outbound messages with relevant context for things like dead letter queues that know which wallet sent the message that failed. Signed-off-by: Daniel Bluhm --- aries_cloudagent/transport/outbound/base.py | 40 +++++++++++++++++-- .../transport/outbound/manager.py | 38 ++---------------- 2 files changed, 40 insertions(+), 38 deletions(-) diff --git a/aries_cloudagent/transport/outbound/base.py b/aries_cloudagent/transport/outbound/base.py index 2186b8bfc0..eb324013b8 100644 --- a/aries_cloudagent/transport/outbound/base.py +++ b/aries_cloudagent/transport/outbound/base.py @@ -1,14 +1,48 @@ """Base outbound transport.""" -import asyncio from abc import ABC, abstractmethod +import asyncio from typing import Union +from ...connections.models.connection_target import ConnectionTarget from ...core.profile import Profile from ...utils.stats import Collector - from ..error import TransportError from ..wire_format import BaseWireFormat +from .message import OutboundMessage + + +class QueuedOutboundMessage: + """Class representing an outbound message pending delivery.""" + + STATE_NEW = "new" + STATE_PENDING = "pending" + STATE_ENCODE = "encode" + STATE_DELIVER = "deliver" + STATE_RETRY = "retry" + STATE_DONE = "done" + + def __init__( + self, + profile: Profile, + message: OutboundMessage, + target: ConnectionTarget, + transport_id: str, + ): + """Initialize the queued outbound message.""" + self.profile = profile + self.endpoint = target and target.endpoint + self.error: Exception = None + self.message = message + self.payload: Union[str, bytes] = None + self.retries = None + self.retry_at: float = None + self.state = self.STATE_NEW + self.target = target + self.task: asyncio.Task = None + self.transport_id: str = transport_id + self.metadata: dict = None + self.api_key: str = None class BaseOutboundTransport(ABC): @@ -66,7 +100,7 @@ async def stop(self): async def handle_message( self, profile: Profile, - payload: Union[str, bytes], + outbound_message: QueuedOutboundMessage, endpoint: str, metadata: dict = None, ): diff --git a/aries_cloudagent/transport/outbound/manager.py b/aries_cloudagent/transport/outbound/manager.py index 094f537f32..dadc8e7f44 100644 --- a/aries_cloudagent/transport/outbound/manager.py +++ b/aries_cloudagent/transport/outbound/manager.py @@ -5,7 +5,7 @@ import logging import time -from typing import Callable, Type, Union +from typing import Callable, Type from urllib.parse import urlparse from ...connections.models.connection_target import ConnectionTarget @@ -22,6 +22,7 @@ BaseOutboundTransport, OutboundDeliveryError, OutboundTransportRegistrationError, + QueuedOutboundMessage, ) from .message import OutboundMessage @@ -29,39 +30,6 @@ MODULE_BASE_PATH = "aries_cloudagent.transport.outbound" -class QueuedOutboundMessage: - """Class representing an outbound message pending delivery.""" - - STATE_NEW = "new" - STATE_PENDING = "pending" - STATE_ENCODE = "encode" - STATE_DELIVER = "deliver" - STATE_RETRY = "retry" - STATE_DONE = "done" - - def __init__( - self, - profile: Profile, - message: OutboundMessage, - target: ConnectionTarget, - transport_id: str, - ): - """Initialize the queued outbound message.""" - self.profile = profile - self.endpoint = target and target.endpoint - self.error: Exception = None - self.message = message - self.payload: Union[str, bytes] = None - self.retries = None - self.retry_at: float = None - self.state = self.STATE_NEW - self.target = target - self.task: asyncio.Task = None - self.transport_id: str = transport_id - self.metadata: dict = None - self.api_key: str = None - - class OutboundTransportManager: """Outbound transport manager class.""" @@ -280,7 +248,7 @@ async def enqueue_message(self, profile: Profile, outbound: OutboundMessage): profile, outbound, target ) await transport.handle_message( - profile, encoded_outbound_message.payload, target.endpoint + profile, encoded_outbound_message, target.endpoint ) else: queued = QueuedOutboundMessage(profile, outbound, target, transport_id)