Skip to content

Commit

Permalink
Merge pull request #1694 from Indicio-tech/feature/undelivered-events
Browse files Browse the repository at this point in the history
Feature/undelivered events
  • Loading branch information
swcurran authored Apr 5, 2022
2 parents 2b3e6e6 + 76c886c commit 74b5792
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 9 deletions.
21 changes: 20 additions & 1 deletion aries_cloudagent/core/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,26 @@ async def outbound_message_router(
"""
Route an outbound message.
Args:
profile: The active profile for the request
message: An outbound message to be sent
inbound: The inbound message that produced this response, if available
"""
status: OutboundSendStatus = await self._outbound_message_router(
profile=profile, outbound=outbound, inbound=inbound
)
await profile.notify(status.topic, outbound)
return status

async def _outbound_message_router(
self,
profile: Profile,
outbound: OutboundMessage,
inbound: InboundMessage = None,
) -> OutboundSendStatus:
"""
Route an outbound message.
Args:
profile: The active profile for the request
message: An outbound message to be sent
Expand Down Expand Up @@ -697,7 +717,6 @@ def handle_not_delivered(
) -> OutboundSendStatus:
"""Handle a message that failed delivery via outbound transports."""
queued_for_inbound = self.inbound_transport_manager.return_undelivered(outbound)

return (
OutboundSendStatus.WAITING_FOR_PICKUP
if queued_for_inbound
Expand Down
54 changes: 46 additions & 8 deletions aries_cloudagent/core/tests/test_conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
PublicKeyType,
Service,
)
from ...core.event_bus import EventBus, MockEventBus
from ...core.in_memory import InMemoryProfileManager
from ...core.profile import ProfileManager
from ...core.protocol_registry import ProtocolRegistry
Expand All @@ -34,6 +35,7 @@
from ...transport.outbound.base import OutboundDeliveryError
from ...transport.outbound.manager import QueuedOutboundMessage
from ...transport.outbound.message import OutboundMessage
from ...transport.outbound.status import OutboundSendStatus
from ...transport.wire_format import BaseWireFormat
from ...transport.pack_format import PackWireFormat
from ...utils.stats import Collector
Expand Down Expand Up @@ -92,6 +94,7 @@ async def build_context(self) -> InjectionContext:
context.injector.bind_instance(ProtocolRegistry, ProtocolRegistry())
context.injector.bind_instance(BaseWireFormat, self.wire_format)
context.injector.bind_instance(DIDResolver, DIDResolver(DIDResolverRegistry()))
context.injector.bind_instance(EventBus, MockEventBus())
return context


Expand Down Expand Up @@ -297,6 +300,8 @@ async def test_outbound_message_handler_return_route(self):

await conductor.setup()

bus = conductor.root_profile.inject(EventBus)

payload = "{}"
message = OutboundMessage(payload=payload)
message.reply_to_verkey = test_to_verkey
Expand All @@ -310,7 +315,14 @@ async def test_outbound_message_handler_return_route(self):
conductor, "queue_outbound", async_mock.CoroutineMock()
) as mock_queue:
mock_return.return_value = True
await conductor.outbound_message_router(conductor.context, message)

status = await conductor.outbound_message_router(
conductor.root_profile, message
)
assert status == OutboundSendStatus.SENT_TO_SESSION
assert bus.events
assert bus.events[0][1].topic == status.topic
assert bus.events[0][1].payload == message
mock_return.assert_called_once_with(message)
mock_queue.assert_not_awaited()

Expand All @@ -324,16 +336,24 @@ async def test_outbound_message_handler_with_target(self):

await conductor.setup()

bus = conductor.root_profile.inject(EventBus)

payload = "{}"
target = ConnectionTarget(
endpoint="endpoint", recipient_keys=(), routing_keys=(), sender_key=""
)
message = OutboundMessage(payload=payload, target=target)

await conductor.outbound_message_router(conductor.context, message)

status = await conductor.outbound_message_router(
conductor.root_profile, message
)
assert status == OutboundSendStatus.QUEUED_FOR_DELIVERY
assert bus.events
print(bus.events)
assert bus.events[0][1].topic == status.topic
assert bus.events[0][1].payload == message
mock_outbound_mgr.return_value.enqueue_message.assert_called_once_with(
conductor.context, message
conductor.root_profile, message
)

async def test_outbound_message_handler_with_connection(self):
Expand All @@ -348,11 +368,21 @@ async def test_outbound_message_handler_with_connection(self):

await conductor.setup()

bus = conductor.root_profile.inject(EventBus)

payload = "{}"
connection_id = "connection_id"
message = OutboundMessage(payload=payload, connection_id=connection_id)

await conductor.outbound_message_router(conductor.root_profile, message)
status = await conductor.outbound_message_router(
conductor.root_profile, message
)

assert status == OutboundSendStatus.QUEUED_FOR_DELIVERY
assert bus.events
print(bus.events)
assert bus.events[0][1].topic == status.topic
assert bus.events[0][1].payload == message

conn_mgr.return_value.get_connection_targets.assert_awaited_once_with(
connection_id=connection_id
Expand All @@ -376,21 +406,29 @@ async def test_outbound_message_handler_with_verkey_no_target(self):

await conductor.setup()

bus = conductor.root_profile.inject(EventBus)

payload = "{}"
message = OutboundMessage(
payload=payload, reply_to_verkey=TestDIDs.test_verkey
)

await conductor.outbound_message_router(
conductor.context,
status = await conductor.outbound_message_router(
conductor.root_profile,
message,
inbound=async_mock.MagicMock(
receipt=async_mock.MagicMock(recipient_verkey=TestDIDs.test_verkey)
),
)

assert status == OutboundSendStatus.QUEUED_FOR_DELIVERY
assert bus.events
print(bus.events)
assert bus.events[0][1].topic == status.topic
assert bus.events[0][1].payload == message

mock_outbound_mgr.return_value.enqueue_message.assert_called_once_with(
conductor.context, message
conductor.root_profile, message
)

async def test_handle_nots(self):
Expand Down
7 changes: 7 additions & 0 deletions aries_cloudagent/transport/outbound/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from enum import Enum

OUTBOUND_STATUS_PREFIX = "acapy::outbound-message::"


class OutboundSendStatus(Enum):
"""Send status of outbound messages."""
Expand All @@ -21,3 +23,8 @@ class OutboundSendStatus(Enum):

# No endpoint available, and no internal queue for messages.
UNDELIVERABLE = "undeliverable"

@property
def topic(self):
"""Return an event topic associated with a given status."""
return f"{OUTBOUND_STATUS_PREFIX}{self.value}"

0 comments on commit 74b5792

Please sign in to comment.