Skip to content

Commit

Permalink
Merge pull request #1408 from globalid/feat-IDENT-3334-external-queue…
Browse files Browse the repository at this point in the history
…-encode

Encode DIDComm messages before sent to the queue
  • Loading branch information
andrewwhitehead authored Sep 10, 2021
2 parents 31580f8 + 514301d commit b0009e1
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 6 deletions.
7 changes: 6 additions & 1 deletion aries_cloudagent/core/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,13 @@ async def _queue_external(
[outbound.target] if outbound.target else (outbound.target_list or [])
)
for target in targets:
encoded_outbound_message = (
await self.outbound_transport_manager.encode_outbound_message(
profile, outbound, target
)
)
await self.outbound_queue.enqueue_message(
outbound.payload, target.endpoint
encoded_outbound_message.payload, target.endpoint
)

return OutboundSendStatus.SENT_TO_EXTERNAL_QUEUE
Expand Down
15 changes: 14 additions & 1 deletion aries_cloudagent/core/tests/test_conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,22 +418,35 @@ async def test_handle_nots(self):
async def test_handle_outbound_queue(self):
builder: ContextBuilder = StubContextBuilder(self.test_settings)
conductor = test_module.Conductor(builder)
encoded_outbound_message_mock = async_mock.MagicMock(payload="message_payload")

payload = "{}"
message = OutboundMessage(
payload=payload,
connection_id="dummy-conn-id",
target=async_mock.MagicMock(),
target=async_mock.MagicMock(endpoint="endpoint"),
reply_to_verkey=TestDIDs.test_verkey,
)

await conductor.setup()
conductor.outbound_queue = async_mock.MagicMock(
enqueue_message=async_mock.CoroutineMock()
)
conductor.outbound_transport_manager = async_mock.MagicMock(
encode_outbound_message=async_mock.CoroutineMock(
return_value=encoded_outbound_message_mock
)
)

await conductor.queue_outbound(conductor.root_profile, message)

conductor.outbound_transport_manager.encode_outbound_message.assert_called_once_with(
conductor.root_profile, message, message.target
)
conductor.outbound_queue.enqueue_message.assert_called_once_with(
encoded_outbound_message_mock.payload, message.target.endpoint
)

async def test_handle_not_returned_ledger_x(self):
builder: ContextBuilder = StubContextBuilder(self.test_settings_admin)
conductor = test_module.Conductor(builder)
Expand Down
34 changes: 30 additions & 4 deletions aries_cloudagent/transport/outbound/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,27 @@ def enqueue_message(self, profile: Profile, outbound: OutboundMessage):
self.outbound_new.append(queued)
self.process_queued()

async def encode_outbound_message(
self, profile: Profile, outbound: OutboundMessage, target: ConnectionTarget
):
"""
Encode outbound message for the target.
Args:
profile: The active profile for the request
outbound: The outbound message to deliver
target: The outbound message target
"""

outbound_message = QueuedOutboundMessage(profile, outbound, target, None)

if outbound_message.message.enc_payload:
outbound_message.payload = outbound_message.message.enc_payload
else:
await self.perform_encode(outbound_message)

return outbound_message

def enqueue_webhook(
self,
topic: str,
Expand Down Expand Up @@ -415,16 +436,21 @@ async def _process_loop(self):

def encode_queued_message(self, queued: QueuedOutboundMessage) -> asyncio.Task:
"""Kick off encoding of a queued message."""

transport = self.get_transport_instance(queued.transport_id)

queued.task = self.task_queue.run(
self.perform_encode(queued),
self.perform_encode(queued, transport.wire_format),
lambda completed: self.finished_encode(queued, completed),
)
return queued.task

async def perform_encode(self, queued: QueuedOutboundMessage):
async def perform_encode(
self, queued: QueuedOutboundMessage, wire_format: BaseWireFormat = None
):
"""Perform message encoding."""
transport = self.get_transport_instance(queued.transport_id)
wire_format = transport.wire_format or self.context.inject(BaseWireFormat)
wire_format = wire_format or self.context.inject(BaseWireFormat)

session = await queued.profile.session()
queued.payload = await wire_format.encode_message(
session,
Expand Down
38 changes: 38 additions & 0 deletions aries_cloudagent/transport/outbound/tests/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from ....config.injection_context import InjectionContext
from ....connections.models.connection_target import ConnectionTarget
from ....core.in_memory import InMemoryProfile
from ...wire_format import BaseWireFormat

from .. import manager as test_module
from ..manager import (
Expand Down Expand Up @@ -316,3 +317,40 @@ async def test_finished_deliver_x_log_debug(self):
) as mock_process:
mock_logger_enabled.return_value = True # cover debug logging
mgr.finished_deliver(mock_queued, mock_completed_x)

async def test_should_encode_outbound_message(self):
context = InjectionContext()
base_wire_format = BaseWireFormat()
encoded_msg = "encoded_message"
base_wire_format.encode_message = async_mock.CoroutineMock(
return_value=encoded_msg
)
context.injector.bind_instance(BaseWireFormat, base_wire_format)
profile = InMemoryProfile.test_session().profile
profile.session = async_mock.CoroutineMock(return_value=async_mock.MagicMock())
outbound = async_mock.MagicMock(payload="payload", enc_payload=None)
target = async_mock.MagicMock()

mgr = OutboundTransportManager(context)
result = await mgr.encode_outbound_message(profile, outbound, target)

assert result.payload == encoded_msg
base_wire_format.encode_message.assert_called_once_with(
await profile.session(),
outbound.payload,
target.recipient_keys,
target.routing_keys,
target.sender_key,
)

async def test_should_not_encode_already_packed_message(self):
context = InjectionContext()
profile = InMemoryProfile.test_session().profile
enc_payload = "enc_payload"
outbound = async_mock.MagicMock(enc_payload=enc_payload)
target = async_mock.MagicMock()

mgr = OutboundTransportManager(context)
result = await mgr.encode_outbound_message(profile, outbound, target)

assert result.payload == enc_payload

0 comments on commit b0009e1

Please sign in to comment.