Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/undelivered events #1694

Merged
21 changes: 20 additions & 1 deletion aries_cloudagent/core/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,26 @@ async def outbound_message_router(
"""
Route an outbound message.

Args:
profile: The active profile for the request
message: An outbound message to be sent
inbound: The inbound message that produced this response, if available
"""
status: OutboundSendStatus = await self._outbound_message_router(
profile=profile, outbound=outbound, inbound=inbound
)
await profile.notify(status.topic, outbound)
return status

async def _outbound_message_router(
self,
profile: Profile,
outbound: OutboundMessage,
inbound: InboundMessage = None,
) -> OutboundSendStatus:
"""
Route an outbound message.

Args:
profile: The active profile for the request
message: An outbound message to be sent
Expand Down Expand Up @@ -697,7 +717,6 @@ def handle_not_delivered(
) -> OutboundSendStatus:
"""Handle a message that failed delivery via outbound transports."""
queued_for_inbound = self.inbound_transport_manager.return_undelivered(outbound)

return (
OutboundSendStatus.WAITING_FOR_PICKUP
if queued_for_inbound
Expand Down
54 changes: 46 additions & 8 deletions aries_cloudagent/core/tests/test_conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
PublicKeyType,
Service,
)
from ...core.event_bus import EventBus, MockEventBus
from ...core.in_memory import InMemoryProfileManager
from ...core.profile import ProfileManager
from ...core.protocol_registry import ProtocolRegistry
Expand All @@ -34,6 +35,7 @@
from ...transport.outbound.base import OutboundDeliveryError
from ...transport.outbound.manager import QueuedOutboundMessage
from ...transport.outbound.message import OutboundMessage
from ...transport.outbound.status import OutboundSendStatus
from ...transport.wire_format import BaseWireFormat
from ...transport.pack_format import PackWireFormat
from ...utils.stats import Collector
Expand Down Expand Up @@ -92,6 +94,7 @@ async def build_context(self) -> InjectionContext:
context.injector.bind_instance(ProtocolRegistry, ProtocolRegistry())
context.injector.bind_instance(BaseWireFormat, self.wire_format)
context.injector.bind_instance(DIDResolver, DIDResolver(DIDResolverRegistry()))
context.injector.bind_instance(EventBus, MockEventBus())
return context


Expand Down Expand Up @@ -297,6 +300,8 @@ async def test_outbound_message_handler_return_route(self):

await conductor.setup()

bus = conductor.root_profile.inject(EventBus)

payload = "{}"
message = OutboundMessage(payload=payload)
message.reply_to_verkey = test_to_verkey
Expand All @@ -310,7 +315,14 @@ async def test_outbound_message_handler_return_route(self):
conductor, "queue_outbound", async_mock.CoroutineMock()
) as mock_queue:
mock_return.return_value = True
await conductor.outbound_message_router(conductor.context, message)

status = await conductor.outbound_message_router(
conductor.root_profile, message
)
assert status == OutboundSendStatus.SENT_TO_SESSION
assert bus.events
assert bus.events[0][1].topic == status.topic
assert bus.events[0][1].payload == message
mock_return.assert_called_once_with(message)
mock_queue.assert_not_awaited()

Expand All @@ -324,16 +336,24 @@ async def test_outbound_message_handler_with_target(self):

await conductor.setup()

bus = conductor.root_profile.inject(EventBus)

payload = "{}"
target = ConnectionTarget(
endpoint="endpoint", recipient_keys=(), routing_keys=(), sender_key=""
)
message = OutboundMessage(payload=payload, target=target)

await conductor.outbound_message_router(conductor.context, message)

status = await conductor.outbound_message_router(
conductor.root_profile, message
)
assert status == OutboundSendStatus.QUEUED_FOR_DELIVERY
assert bus.events
print(bus.events)
assert bus.events[0][1].topic == status.topic
assert bus.events[0][1].payload == message
mock_outbound_mgr.return_value.enqueue_message.assert_called_once_with(
conductor.context, message
conductor.root_profile, message
)

async def test_outbound_message_handler_with_connection(self):
Expand All @@ -348,11 +368,21 @@ async def test_outbound_message_handler_with_connection(self):

await conductor.setup()

bus = conductor.root_profile.inject(EventBus)

payload = "{}"
connection_id = "connection_id"
message = OutboundMessage(payload=payload, connection_id=connection_id)

await conductor.outbound_message_router(conductor.root_profile, message)
status = await conductor.outbound_message_router(
conductor.root_profile, message
)

assert status == OutboundSendStatus.QUEUED_FOR_DELIVERY
assert bus.events
print(bus.events)
assert bus.events[0][1].topic == status.topic
assert bus.events[0][1].payload == message

conn_mgr.return_value.get_connection_targets.assert_awaited_once_with(
connection_id=connection_id
Expand All @@ -376,21 +406,29 @@ async def test_outbound_message_handler_with_verkey_no_target(self):

await conductor.setup()

bus = conductor.root_profile.inject(EventBus)

payload = "{}"
message = OutboundMessage(
payload=payload, reply_to_verkey=TestDIDs.test_verkey
)

await conductor.outbound_message_router(
conductor.context,
status = await conductor.outbound_message_router(
conductor.root_profile,
message,
inbound=async_mock.MagicMock(
receipt=async_mock.MagicMock(recipient_verkey=TestDIDs.test_verkey)
),
)

assert status == OutboundSendStatus.QUEUED_FOR_DELIVERY
assert bus.events
print(bus.events)
assert bus.events[0][1].topic == status.topic
assert bus.events[0][1].payload == message

mock_outbound_mgr.return_value.enqueue_message.assert_called_once_with(
conductor.context, message
conductor.root_profile, message
)

async def test_handle_nots(self):
Expand Down
7 changes: 7 additions & 0 deletions aries_cloudagent/transport/outbound/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from enum import Enum

OUTBOUND_STATUS_PREFIX = "acapy::outbound-message::"


class OutboundSendStatus(Enum):
"""Send status of outbound messages."""
Expand All @@ -21,3 +23,8 @@ class OutboundSendStatus(Enum):

# No endpoint available, and no internal queue for messages.
UNDELIVERABLE = "undeliverable"

@property
def topic(self):
"""Return an event topic associated with a given status."""
return f"{OUTBOUND_STATUS_PREFIX}{self.value}"