Skip to content

Commit

Permalink
Merge pull request #2170 from sicpa-dlab/feature/outbound-context-in-…
Browse files Browse the repository at this point in the history
…queues

BREAKING: feat: get queued outbound message in transport handle message
  • Loading branch information
swcurran authored Jul 5, 2023
2 parents dceb9e4 + d5fb0de commit dd3c864
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 38 deletions.
40 changes: 37 additions & 3 deletions aries_cloudagent/transport/outbound/base.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,48 @@
"""Base outbound transport."""

import asyncio
from abc import ABC, abstractmethod
import asyncio
from typing import Union

from ...connections.models.connection_target import ConnectionTarget
from ...core.profile import Profile
from ...utils.stats import Collector

from ..error import TransportError
from ..wire_format import BaseWireFormat
from .message import OutboundMessage


class QueuedOutboundMessage:
"""Class representing an outbound message pending delivery."""

STATE_NEW = "new"
STATE_PENDING = "pending"
STATE_ENCODE = "encode"
STATE_DELIVER = "deliver"
STATE_RETRY = "retry"
STATE_DONE = "done"

def __init__(
self,
profile: Profile,
message: OutboundMessage,
target: ConnectionTarget,
transport_id: str,
):
"""Initialize the queued outbound message."""
self.profile = profile
self.endpoint = target and target.endpoint
self.error: Exception = None
self.message = message
self.payload: Union[str, bytes] = None
self.retries = None
self.retry_at: float = None
self.state = self.STATE_NEW
self.target = target
self.task: asyncio.Task = None
self.transport_id: str = transport_id
self.metadata: dict = None
self.api_key: str = None


class BaseOutboundTransport(ABC):
Expand Down Expand Up @@ -66,7 +100,7 @@ async def stop(self):
async def handle_message(
self,
profile: Profile,
payload: Union[str, bytes],
outbound_message: QueuedOutboundMessage,
endpoint: str,
metadata: dict = None,
):
Expand Down
38 changes: 3 additions & 35 deletions aries_cloudagent/transport/outbound/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import time

from typing import Callable, Type, Union
from typing import Callable, Type
from urllib.parse import urlparse

from ...connections.models.connection_target import ConnectionTarget
Expand All @@ -22,46 +22,14 @@
BaseOutboundTransport,
OutboundDeliveryError,
OutboundTransportRegistrationError,
QueuedOutboundMessage,
)
from .message import OutboundMessage

LOGGER = logging.getLogger(__name__)
MODULE_BASE_PATH = "aries_cloudagent.transport.outbound"


class QueuedOutboundMessage:
"""Class representing an outbound message pending delivery."""

STATE_NEW = "new"
STATE_PENDING = "pending"
STATE_ENCODE = "encode"
STATE_DELIVER = "deliver"
STATE_RETRY = "retry"
STATE_DONE = "done"

def __init__(
self,
profile: Profile,
message: OutboundMessage,
target: ConnectionTarget,
transport_id: str,
):
"""Initialize the queued outbound message."""
self.profile = profile
self.endpoint = target and target.endpoint
self.error: Exception = None
self.message = message
self.payload: Union[str, bytes] = None
self.retries = None
self.retry_at: float = None
self.state = self.STATE_NEW
self.target = target
self.task: asyncio.Task = None
self.transport_id: str = transport_id
self.metadata: dict = None
self.api_key: str = None


class OutboundTransportManager:
"""Outbound transport manager class."""

Expand Down Expand Up @@ -280,7 +248,7 @@ async def enqueue_message(self, profile: Profile, outbound: OutboundMessage):
profile, outbound, target
)
await transport.handle_message(
profile, encoded_outbound_message.payload, target.endpoint
profile, encoded_outbound_message, target.endpoint
)
else:
queued = QueuedOutboundMessage(profile, outbound, target, transport_id)
Expand Down

0 comments on commit dd3c864

Please sign in to comment.