diff --git a/aries_cloudagent/transport/outbound/base.py b/aries_cloudagent/transport/outbound/base.py index 2186b8bfc0..eb324013b8 100644 --- a/aries_cloudagent/transport/outbound/base.py +++ b/aries_cloudagent/transport/outbound/base.py @@ -1,14 +1,48 @@ """Base outbound transport.""" -import asyncio from abc import ABC, abstractmethod +import asyncio from typing import Union +from ...connections.models.connection_target import ConnectionTarget from ...core.profile import Profile from ...utils.stats import Collector - from ..error import TransportError from ..wire_format import BaseWireFormat +from .message import OutboundMessage + + +class QueuedOutboundMessage: + """Class representing an outbound message pending delivery.""" + + STATE_NEW = "new" + STATE_PENDING = "pending" + STATE_ENCODE = "encode" + STATE_DELIVER = "deliver" + STATE_RETRY = "retry" + STATE_DONE = "done" + + def __init__( + self, + profile: Profile, + message: OutboundMessage, + target: ConnectionTarget, + transport_id: str, + ): + """Initialize the queued outbound message.""" + self.profile = profile + self.endpoint = target and target.endpoint + self.error: Exception = None + self.message = message + self.payload: Union[str, bytes] = None + self.retries = None + self.retry_at: float = None + self.state = self.STATE_NEW + self.target = target + self.task: asyncio.Task = None + self.transport_id: str = transport_id + self.metadata: dict = None + self.api_key: str = None class BaseOutboundTransport(ABC): @@ -66,7 +100,7 @@ async def stop(self): async def handle_message( self, profile: Profile, - payload: Union[str, bytes], + outbound_message: QueuedOutboundMessage, endpoint: str, metadata: dict = None, ): diff --git a/aries_cloudagent/transport/outbound/manager.py b/aries_cloudagent/transport/outbound/manager.py index 094f537f32..dadc8e7f44 100644 --- a/aries_cloudagent/transport/outbound/manager.py +++ b/aries_cloudagent/transport/outbound/manager.py @@ -5,7 +5,7 @@ import logging import time -from typing import Callable, Type, Union +from typing import Callable, Type from urllib.parse import urlparse from ...connections.models.connection_target import ConnectionTarget @@ -22,6 +22,7 @@ BaseOutboundTransport, OutboundDeliveryError, OutboundTransportRegistrationError, + QueuedOutboundMessage, ) from .message import OutboundMessage @@ -29,39 +30,6 @@ MODULE_BASE_PATH = "aries_cloudagent.transport.outbound" -class QueuedOutboundMessage: - """Class representing an outbound message pending delivery.""" - - STATE_NEW = "new" - STATE_PENDING = "pending" - STATE_ENCODE = "encode" - STATE_DELIVER = "deliver" - STATE_RETRY = "retry" - STATE_DONE = "done" - - def __init__( - self, - profile: Profile, - message: OutboundMessage, - target: ConnectionTarget, - transport_id: str, - ): - """Initialize the queued outbound message.""" - self.profile = profile - self.endpoint = target and target.endpoint - self.error: Exception = None - self.message = message - self.payload: Union[str, bytes] = None - self.retries = None - self.retry_at: float = None - self.state = self.STATE_NEW - self.target = target - self.task: asyncio.Task = None - self.transport_id: str = transport_id - self.metadata: dict = None - self.api_key: str = None - - class OutboundTransportManager: """Outbound transport manager class.""" @@ -280,7 +248,7 @@ async def enqueue_message(self, profile: Profile, outbound: OutboundMessage): profile, outbound, target ) await transport.handle_message( - profile, encoded_outbound_message.payload, target.endpoint + profile, encoded_outbound_message, target.endpoint ) else: queued = QueuedOutboundMessage(profile, outbound, target, transport_id)