Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BREAKING: feat: get queued outbound message in transport handle message #2170

40 changes: 37 additions & 3 deletions aries_cloudagent/transport/outbound/base.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,48 @@
"""Base outbound transport."""

import asyncio
from abc import ABC, abstractmethod
import asyncio
from typing import Union

from ...connections.models.connection_target import ConnectionTarget
from ...core.profile import Profile
from ...utils.stats import Collector

from ..error import TransportError
from ..wire_format import BaseWireFormat
from .message import OutboundMessage


class QueuedOutboundMessage:
"""Class representing an outbound message pending delivery."""

STATE_NEW = "new"
STATE_PENDING = "pending"
STATE_ENCODE = "encode"
STATE_DELIVER = "deliver"
STATE_RETRY = "retry"
STATE_DONE = "done"

def __init__(
self,
profile: Profile,
message: OutboundMessage,
target: ConnectionTarget,
transport_id: str,
):
"""Initialize the queued outbound message."""
self.profile = profile
self.endpoint = target and target.endpoint
self.error: Exception = None
self.message = message
self.payload: Union[str, bytes] = None
self.retries = None
self.retry_at: float = None
self.state = self.STATE_NEW
self.target = target
self.task: asyncio.Task = None
self.transport_id: str = transport_id
self.metadata: dict = None
self.api_key: str = None


class BaseOutboundTransport(ABC):
Expand Down Expand Up @@ -66,7 +100,7 @@ async def stop(self):
async def handle_message(
self,
profile: Profile,
payload: Union[str, bytes],
outbound_message: QueuedOutboundMessage,
endpoint: str,
metadata: dict = None,
):
Expand Down
38 changes: 3 additions & 35 deletions aries_cloudagent/transport/outbound/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import time

from typing import Callable, Type, Union
from typing import Callable, Type
from urllib.parse import urlparse

from ...connections.models.connection_target import ConnectionTarget
Expand All @@ -22,46 +22,14 @@
BaseOutboundTransport,
OutboundDeliveryError,
OutboundTransportRegistrationError,
QueuedOutboundMessage,
)
from .message import OutboundMessage

LOGGER = logging.getLogger(__name__)
MODULE_BASE_PATH = "aries_cloudagent.transport.outbound"


class QueuedOutboundMessage:
"""Class representing an outbound message pending delivery."""

STATE_NEW = "new"
STATE_PENDING = "pending"
STATE_ENCODE = "encode"
STATE_DELIVER = "deliver"
STATE_RETRY = "retry"
STATE_DONE = "done"

def __init__(
self,
profile: Profile,
message: OutboundMessage,
target: ConnectionTarget,
transport_id: str,
):
"""Initialize the queued outbound message."""
self.profile = profile
self.endpoint = target and target.endpoint
self.error: Exception = None
self.message = message
self.payload: Union[str, bytes] = None
self.retries = None
self.retry_at: float = None
self.state = self.STATE_NEW
self.target = target
self.task: asyncio.Task = None
self.transport_id: str = transport_id
self.metadata: dict = None
self.api_key: str = None


class OutboundTransportManager:
"""Outbound transport manager class."""

Expand Down Expand Up @@ -280,7 +248,7 @@ async def enqueue_message(self, profile: Profile, outbound: OutboundMessage):
profile, outbound, target
)
await transport.handle_message(
profile, encoded_outbound_message.payload, target.endpoint
profile, encoded_outbound_message, target.endpoint
)
else:
queued = QueuedOutboundMessage(profile, outbound, target, transport_id)
Expand Down