From fcb464c1117e5405db9e1d09a864b1ab3ba034aa Mon Sep 17 00:00:00 2001 From: Timo Glastra Date: Sun, 20 Mar 2022 21:56:57 +0100 Subject: [PATCH] feat: connectionless working with auto-accept Signed-off-by: Timo Glastra --- aries_cloudagent/core/conductor.py | 21 +- aries_cloudagent/core/dispatcher.py | 21 +- aries_cloudagent/core/oob_processor.py | 300 +++++ aries_cloudagent/messaging/request_context.py | 2 +- .../v1_0/handlers/credential_ack_handler.py | 21 +- .../v1_0/handlers/credential_issue_handler.py | 24 +- .../v1_0/handlers/credential_offer_handler.py | 36 +- .../credential_problem_report_handler.py | 12 +- .../handlers/credential_proposal_handler.py | 9 +- .../handlers/credential_request_handler.py | 19 +- .../issue_credential/v1_0/manager.py | 73 +- .../v1_0/models/credential_exchange.py | 14 +- .../protocols/out_of_band/v1_0/manager.py | 1114 +++++++---------- .../out_of_band/v1_0/models/invitation.py | 13 +- .../out_of_band/v1_0/models/oob_record.py | 172 +++ .../protocols/out_of_band/v1_0/routes.py | 2 +- .../v1_0/handlers/presentation_ack_handler.py | 16 +- .../v1_0/handlers/presentation_handler.py | 19 +- .../presentation_problem_report_handler.py | 12 +- .../handlers/presentation_proposal_handler.py | 9 +- .../handlers/presentation_request_handler.py | 29 +- .../protocols/present_proof/v1_0/manager.py | 60 +- .../protocols/present_proof/v1_0/routes.py | 2 - aries_cloudagent/transport/inbound/receipt.py | 25 + aries_cloudagent/transport/pack_format.py | 1 + aries_cloudagent/transport/wire_format.py | 1 + 26 files changed, 1300 insertions(+), 727 deletions(-) create mode 100644 aries_cloudagent/core/oob_processor.py create mode 100644 aries_cloudagent/protocols/out_of_band/v1_0/models/oob_record.py diff --git a/aries_cloudagent/core/conductor.py b/aries_cloudagent/core/conductor.py index 6ed174545f..82ef06bea3 100644 --- a/aries_cloudagent/core/conductor.py +++ b/aries_cloudagent/core/conductor.py @@ -65,6 +65,7 @@ from ..vc.ld_proofs.document_loader import DocumentLoader from ..version import __version__, RECORD_TYPE_ACAPY_VERSION from ..wallet.did_info import DIDInfo +from .oob_processor import OobMessageProcessor from .dispatcher import Dispatcher from .util import STARTUP_EVENT_TOPIC, SHUTDOWN_EVENT_TOPIC @@ -206,6 +207,12 @@ async def setup(self): BaseMultitenantManager, MultitenantManagerProvider(self.root_profile) ) + # Bind oob message processor to be able to receive and process un-encrypted messages + context.injector.bind_instance( + OobMessageProcessor, + OobMessageProcessor(inbound_message_router=self.inbound_message_router), + ) + # Bind default PyLD document loader context.injector.bind_instance( DocumentLoader, DocumentLoader(self.root_profile) @@ -631,8 +638,10 @@ async def queue_outbound( message: An outbound message to be sent inbound: The inbound message that produced this response, if available """ + has_target = outbound.target or outbound.target_list + # populate connection target(s) - if not outbound.target and not outbound.target_list and outbound.connection_id: + if not has_target and outbound.connection_id: conn_mgr = ConnectionManager(profile) try: outbound.target_list = await self.dispatcher.run_task( @@ -649,6 +658,16 @@ async def queue_outbound( self.admin_server.notify_fatal_error() raise del conn_mgr + + # Find oob/connectionless target we can send the message to + elif not has_target and outbound.reply_thread_id: + message_processor = profile.inject(OobMessageProcessor) + outbound.target = await self.dispatcher.run_task( + message_processor.find_oob_target_for_outbound_message( + profile, outbound + ) + ) + # If ``self.outbound_queue`` is specified (usually set via # outbound queue `-oq` commandline option), use that external # queue. Else save the message to an internal queue. This diff --git a/aries_cloudagent/core/dispatcher.py b/aries_cloudagent/core/dispatcher.py index 98df420472..57ef012021 100644 --- a/aries_cloudagent/core/dispatcher.py +++ b/aries_cloudagent/core/dispatcher.py @@ -14,6 +14,8 @@ from aiohttp.web import HTTPException + +from ..connections.models.conn_record import ConnRecord from ..core.profile import Profile from ..messaging.agent_message import AgentMessage from ..messaging.base_message import BaseMessage @@ -173,11 +175,20 @@ async def handle_message( context.injector.bind_instance(BaseResponder, responder) - connection_mgr = ConnectionManager(profile) - connection = await connection_mgr.find_inbound_connection( - inbound_message.receipt - ) - del connection_mgr + # When processing oob attach message we supply the connection id + # associated with the inbound message + if inbound_message.connection_id: + async with self.profile.session() as session: + connection = await ConnRecord.retrieve_by_id( + session, inbound_message.connection_id + ) + else: + connection_mgr = ConnectionManager(profile) + connection = await connection_mgr.find_inbound_connection( + inbound_message.receipt + ) + del connection_mgr + if connection: inbound_message.connection_id = connection.connection_id diff --git a/aries_cloudagent/core/oob_processor.py b/aries_cloudagent/core/oob_processor.py new file mode 100644 index 0000000000..1e126ca5d4 --- /dev/null +++ b/aries_cloudagent/core/oob_processor.py @@ -0,0 +1,300 @@ +"""Oob message processor and functions.""" + +import json +import logging +from typing import Any, Callable, Dict, Optional, cast + + +from ..ledger.base import BaseLedger +from ..connections.models.conn_record import ConnRecord +from ..connections.models.connection_target import ConnectionTarget +from ..did.did_key import DIDKey +from ..messaging.decorators.service_decorator import ServiceDecorator +from ..messaging.request_context import RequestContext +from ..protocols.out_of_band.v1_0.messages.service import Service +from ..protocols.out_of_band.v1_0.models.oob_record import OobRecord +from ..storage.error import StorageNotFoundError +from ..transport.inbound.message import InboundMessage +from ..transport.outbound.message import OutboundMessage +from ..transport.wire_format import JsonWireFormat +from .profile import Profile + +LOGGER = logging.getLogger(__name__) + + +class OobMessageProcessor: + """Out of band message processor.""" + + def __init__( + self, + inbound_message_router: Callable[ + [Profile, InboundMessage, Optional[bool]], None + ], + ) -> None: + """ + Initialize an inbound OOB message processor + + Args: + inbound_message_router: Method to create a new inbound session + + """ + self._inbound_message_router = inbound_message_router + self.wire_format = JsonWireFormat() + + async def find_oob_target_for_outbound_message( + self, profile: Profile, outbound_message: OutboundMessage + ) -> Optional[ConnectionTarget]: + try: + async with profile.session() as session: + # Try to find the oob record for the outbound message: + oob_record = await OobRecord.retrieve_by_tag_filter( + session, {"attach_thread_id": outbound_message.reply_thread_id} + ) + + their_service = oob_record.their_service + their_service = ServiceDecorator.deserialize(their_service) + + # FIXME: integrate with mediation + our_service = ServiceDecorator( + recipient_keys=[oob_record.our_recipient_key], + endpoint=profile.settings.get("default_endpoint"), + routing_keys=[], + ) + + # Attach ~service decorator so other message can respond + message = json.loads(outbound_message.payload) + if not message.get("~service"): + message["~service"] = our_service.serialize() + + # TODO: state is somewhat done, but we need it for connectionless exchange + # if is_first_response: + message["~thread"] = { + **message.get("~thread", {}), + "pthid": oob_record.invi_msg_id, + } + + outbound_message.payload = json.dumps(message) + + return ConnectionTarget( + endpoint=their_service.endpoint, + recipient_keys=their_service.recipient_keys, + routing_keys=their_service.routing_keys, + sender_key=oob_record.our_recipient_key, + ) + except StorageNotFoundError: + return None + + async def find_oob_record_for_inbound_message( + self, context: RequestContext + ) -> Optional[OobRecord]: + message_type = context.message._type + oob_record = None + + async with context.profile.session() as session: + # First try to find the oob record based on the associated pthid + if context.message_receipt.parent_thread_id: + try: + LOGGER.debug( + f"Retrieving OOB record using pthid {context.message_receipt.parent_thread_id} for message type {message_type}" + ) + oob_record = await OobRecord.retrieve_by_tag_filter( + session, + {"invi_msg_id": context.message_receipt.parent_thread_id}, + ) + except StorageNotFoundError: + # Fine if record is not found + pass + # Otherwise try to find it using the attach thread id. This is only needed + # for connectionless exchanges where every handlers needs the context of the oob + # record for verification. We could attach the oob_record to all messages, even if + # we have a connection, but it would add another query to all inbound messages. + if ( + not oob_record + and not context.connection_record + and context.message_receipt.thread_id + and context.message_receipt.recipient_verkey + ): + try: + LOGGER.debug( + f"Retrieving OOB record using thid {context.message_receipt.thread_id} and recipient verkey {context.message_receipt.recipient_verkey} for message type {message_type}" + ) + oob_record = await OobRecord.retrieve_by_tag_filter( + session, + { + "attach_thread_id": context.message_receipt.thread_id, + "our_recipient_key": context.message_receipt.recipient_verkey, + }, + ) + except StorageNotFoundError: + # Fine if record is not found + pass + + # If not oob record was found we can return early without oob record + if not oob_record: + return None + + LOGGER.debug( + f"Found out of band record for inbound message with type {message_type}: %s", + oob_record, + ) + + # If the connection does not match with the connection id associated with the + # oob record we don't want to associate the oob record to the current context + # This is not the case if the state is await response, in this case we might want + # to update the connection id on the oob record + if ( + # Only if we created the invitation + oob_record.role == OobRecord.ROLE_SENDER + # If connection is present and not same as oob_record conn id + and context.connection_record + and context.connection_record.connection_id != oob_record.connection_id + ): + LOGGER.debug( + f"Oob record connection id {oob_record.connection_id} is different from inbound message connection {context.connection_record.connection_id}", + ) + # Mismatch in connection id's in only allowed in state await response (connection id can change bc of reuse) + if oob_record.state != OobRecord.STATE_AWAIT_RESPONSE: + LOGGER.debug( + f"Inbound message has incorrect connection_id {context.connection_record.connection_id}. Oob record {oob_record.oob_id} associated with connection id {oob_record.connection_id}" + ) + return None + + # If the state is await response, and there are attachments we want to update the connection id + # on the oob record. In case no request_attach is present, this is handled by the reuse handlers + if ( + oob_record.invitation.requests_attach + and oob_record.state == OobRecord.STATE_AWAIT_RESPONSE + ): + LOGGER.debug( + f"Removing stale connection {oob_record.connection_id} due to connection reuse" + ) + # Remove stale connection due to connection reuse + if oob_record.connection_id: + async with context.profile.session() as session: + old_conn_record = await ConnRecord.retrieve_by_id( + session, oob_record.connection_id + ) + await old_conn_record.delete_record(session) + + oob_record.connection_id = context.connection_record.connection_id + + # If no attach_thread_id is stored yet we need to match the current message thread_id against the attached messages + # in the oob invitation + if not oob_record.attach_thread_id and oob_record.invitation.requests_attach: + # Check if the current message thread_id corresponds to one of the invitation ~thread.thid + allowed_thread_ids = [ + self._get_thread_id(attachment.content) + for attachment in oob_record.invitation.requests_attach + ] + + if not context.message_receipt.thread_id in allowed_thread_ids: + LOGGER.debug( + f"Inbound message is for not allowed thread {context.message_receipt.thread_id}. Allowed threads are {allowed_thread_ids}" + ) + return None + + oob_record.attach_thread_id = context.message_receipt.thread_id + elif ( + oob_record.attach_thread_id + and context.message_receipt.thread_id != oob_record.attach_thread_id + ): + LOGGER.debug( + f"Inbound message thread id {context.message_receipt.thread_id} does not match oob record thread id {oob_record.attach_thread_id}" + ) + return None + + their_service = ( + cast( + ServiceDecorator, + ServiceDecorator.deserialize(oob_record.their_service), + ) + if oob_record.their_service + else None + ) + + # Verify the sender key is present in their service in our record + # If we don't have the sender verkey stored yet we can allow any key + if ( + their_service + # FIXME: does this mean anyone with anoncreds can send a message? + and context.message_receipt.sender_verkey + and context.message_receipt.sender_verkey + not in their_service.recipient_keys + ): + LOGGER.debug( + "Inbound message sender verkey does not match stored service on oob record" + ) + return None + + # If the message has a ~service decorator we save it in the oob record so we can reply to this message + if context._message._service: + # TODO: what should we do if the keys don't match? I would say for now we require the complete + # oob exchange to use the same keys + oob_record.their_service = context.message._service.serialize() + + async with context.profile.session() as session: + await oob_record.save(session, reason="Update their service in oob record") + + return oob_record + + async def handle_message( + self, profile: Profile, message: Dict[str, Any], oob_record: OobRecord + ): + """Message handler for inbound messages.""" + + message_str = json.dumps(message) + + async with profile.session() as session: + message_dict, receipt = await self.wire_format.parse_message( + session, message_str + ) + + inbound_message = InboundMessage( + payload=message_dict, + connection_id=oob_record.connection_id, + receipt=receipt, + ) + # Create ~service from the oob service + + if not oob_record.connection_id: + service = oob_record.invitation.services[0] + + if isinstance(service, str): + async with session.inject(BaseLedger) as ledger: + endpoint = await ledger.get_endpoint_for_did(service) + verkey = await ledger.get_key_for_did(service) + + service_decorator = ServiceDecorator( + endpoint=endpoint, + recipient_keys=[verkey], + routing_keys=[], + ) + else: + service_decorator = self._service_decorator_from_service(service) + + oob_record.their_service = service_decorator.serialize() + + oob_record.attach_thread_id = inbound_message.receipt.thread_id + + await oob_record.save(session) + + self._inbound_message_router(profile, inbound_message, False) + + def _get_thread_id(self, message: Dict[str, Any]) -> str: + return message.get("~thread", {}).get("thid") or message.get("@id") + + def _service_decorator_from_service(self, service: Service) -> ServiceDecorator: + # Create ~service decorator from the oob service + recipient_keys = [ + DIDKey.from_did(did_key).public_key_b58 + for did_key in service.recipient_keys + ] + routing_keys = [ + DIDKey.from_did(did_key).public_key_b58 for did_key in service.routing_keys + ] + + return ServiceDecorator( + endpoint=service.service_endpoint, + recipient_keys=recipient_keys, + routing_keys=routing_keys, + ) diff --git a/aries_cloudagent/messaging/request_context.py b/aries_cloudagent/messaging/request_context.py index 97e0d1f9ef..715591cb2d 100644 --- a/aries_cloudagent/messaging/request_context.py +++ b/aries_cloudagent/messaging/request_context.py @@ -61,7 +61,7 @@ def connection_ready(self, active: bool): self._connection_ready = active @property - def connection_record(self) -> ConnRecord: + def connection_record(self) -> Optional[ConnRecord]: """Accessor for the related connection record.""" return self._connection_record diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_ack_handler.py b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_ack_handler.py index 26faeb8915..da5712e7ab 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_ack_handler.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_ack_handler.py @@ -1,5 +1,6 @@ """Credential ack message handler.""" +from .....core.oob_processor import OobMessageProcessor from .....messaging.base_handler import BaseHandler, HandlerException from .....messaging.request_context import RequestContext from .....messaging.responder import BaseResponder @@ -29,12 +30,26 @@ async def handle(self, context: RequestContext, responder: BaseResponder): context.message.serialize(as_string=True), ) - if not context.connection_ready: - raise HandlerException("No connection established for credential ack") + # If connection is present it must be ready for use + if context.connection_record and not context.connection_ready: + raise HandlerException("Connection used for credential ack not ready") + + # Find associated oob record + oob_processor = context.inject(OobMessageProcessor) + oob_record = await oob_processor.find_oob_record_for_inbound_message(context) + + # Either connection or oob context must be present + if not context.connection_record and not oob_record: + raise HandlerException( + "No connection or associated connectionless exchange found for credential ack" + ) credential_manager = CredentialManager(context.profile) await credential_manager.receive_credential_ack( - context.message, context.connection_record.connection_id + context.message, + context.connection_record.connection_id + if context.connection_record + else None, ) trace_event( diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_issue_handler.py b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_issue_handler.py index 9048311cfe..f34560c624 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_issue_handler.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_issue_handler.py @@ -1,5 +1,9 @@ """Credential issue message handler.""" +from aries_cloudagent.protocols.connections.v1_0.handlers.tests.test_request_handler import ( + connection_record, +) +from .....core.oob_processor import OobMessageProcessor from .....indy.holder import IndyHolderError from .....messaging.base_handler import BaseHandler, HandlerException from .....messaging.models.base import BaseModelError @@ -34,12 +38,26 @@ async def handle(self, context: RequestContext, responder: BaseResponder): "Received credential message: %s", context.message.serialize(as_string=True) ) - if not context.connection_ready: - raise HandlerException("No connection established for credential issue") + # If connection is present it must be ready for use + if context.connection_record and not context.connection_ready: + raise HandlerException("Connection used for credential not ready") + + # Find associated oob record + oob_processor = context.inject(OobMessageProcessor) + oob_record = await oob_processor.find_oob_record_for_inbound_message(context) + + # Either connection or oob context must be present + if not context.connection_record and not oob_record: + raise HandlerException( + "No connection or associated connectionless exchange found for credential" + ) credential_manager = CredentialManager(profile) cred_ex_record = await credential_manager.receive_credential( - context.message, context.connection_record.connection_id + context.message, + context.connection_record.connection_id + if context.connection_record + else None, ) # mgr only finds, saves record: on exception, saving state null is hopeless r_time = trace_event( diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_offer_handler.py b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_offer_handler.py index af1f3844ae..ded7722293 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_offer_handler.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_offer_handler.py @@ -1,5 +1,8 @@ """Credential offer message handler.""" +from base58 import b58decode, b58encode + +from .....core.oob_processor import OobMessageProcessor from .....indy.holder import IndyHolderError from .....ledger.error import LedgerError from .....messaging.base_handler import BaseHandler, HandlerException @@ -36,12 +39,29 @@ async def handle(self, context: RequestContext, responder: BaseResponder): context.message.serialize(as_string=True), ) - if not context.connection_ready: - raise HandlerException("No connection established for credential offer") + # If connection is present it must be ready for use + if context.connection_record and not context.connection_ready: + raise HandlerException("Connection used for credential offer not ready") + + # Find associated oob record + oob_processor = context.inject(OobMessageProcessor) + oob_record = await oob_processor.find_oob_record_for_inbound_message(context) + + # Either connection or oob context must be present + if not context.connection_record and not oob_record: + raise HandlerException( + "No connection or associated connectionless exchange found for credential offer" + ) + + connection_id = ( + context.connection_record.connection_id + if context.connection_record + else None + ) credential_manager = CredentialManager(profile) cred_ex_record = await credential_manager.receive_offer( - context.message, context.connection_record.connection_id + context.message, connection_id ) # mgr only finds, saves record: on exception, saving state null is hopeless r_time = trace_event( @@ -51,6 +71,14 @@ async def handle(self, context: RequestContext, responder: BaseResponder): perf_counter=r_time, ) + if context.connection_record: + holder_did = context.connection_record.my_did + else: + # Transform recipient key into did + holder_did = b58encode(b58decode(oob_record.our_recipient_key)[:16]).decode( + "utf-8" + ) + # If auto respond is turned on, automatically reply with credential request if cred_ex_record and context.settings.get( "debug.auto_respond_credential_offer" @@ -62,7 +90,7 @@ async def handle(self, context: RequestContext, responder: BaseResponder): credential_request_message, ) = await credential_manager.create_request( cred_ex_record=cred_ex_record, - holder_did=context.connection_record.my_did, + holder_did=holder_did, ) await responder.send_reply(credential_request_message) except ( diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_problem_report_handler.py b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_problem_report_handler.py index bb88d6614f..24d2b25079 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_problem_report_handler.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_problem_report_handler.py @@ -1,6 +1,6 @@ """Credential problem report message handler.""" -from .....messaging.base_handler import BaseHandler +from .....messaging.base_handler import BaseHandler, HandlerException from .....messaging.request_context import RequestContext from .....messaging.responder import BaseResponder from .....storage.error import StorageError, StorageNotFoundError @@ -26,6 +26,16 @@ async def handle(self, context: RequestContext, responder: BaseResponder): ) assert isinstance(context.message, CredentialProblemReport) + # If connection is present it must be ready for use + if context.connection_record and not context.connection_ready: + raise HandlerException( + "Connection used for credential problem report not ready" + ) + elif not context.connection_record: + raise HandlerException( + "Connectionless not supported for credential problem report" + ) + credential_manager = CredentialManager(context.profile) try: await credential_manager.receive_problem_report( diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_proposal_handler.py b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_proposal_handler.py index 502eee171b..b42338aa10 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_proposal_handler.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_proposal_handler.py @@ -37,8 +37,13 @@ async def handle(self, context: RequestContext, responder: BaseResponder): context.message.serialize(as_string=True), ) - if not context.connection_ready: - raise HandlerException("No connection established for credential proposal") + # If connection is present it must be ready for use + if context.connection_record and not context.connection_ready: + raise HandlerException("Connection used for credential proposal not ready") + elif not context.connection_record: + raise HandlerException( + "Connectionless not supported for credential proposal" + ) credential_manager = CredentialManager(profile) cred_ex_record = await credential_manager.receive_proposal( diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_request_handler.py b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_request_handler.py index bc6d028c10..eecbf2e348 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_request_handler.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_request_handler.py @@ -1,5 +1,6 @@ """Credential request message handler.""" +from .....core.oob_processor import OobMessageProcessor from .....indy.issuer import IndyIssuerError from .....ledger.error import LedgerError from .....messaging.base_handler import BaseHandler, HandlerException @@ -36,12 +37,24 @@ async def handle(self, context: RequestContext, responder: BaseResponder): context.message.serialize(as_string=True), ) - if not context.connection_ready: - raise HandlerException("No connection established for credential request") + # If connection is present it must be ready for use + if context.connection_record and not context.connection_ready: + raise HandlerException("Connection used for credential request not ready") + + # Find associated oob record. If the credential offer was created as an oob attachment + # the presentation exchange record won't have a connection id (yet) + oob_processor = context.inject(OobMessageProcessor) + oob_record = await oob_processor.find_oob_record_for_inbound_message(context) + + # Either connection or oob context must be present + if not context.connection_record and not oob_record: + raise HandlerException( + "No connection or associated connectionless exchange found for credential request" + ) credential_manager = CredentialManager(profile) cred_ex_record = await credential_manager.receive_request( - context.message, context.connection_record.connection_id + context.message, context.connection_record, oob_record ) # mgr only finds, saves record: on exception, saving state null is hopeless r_time = trace_event( diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/manager.py b/aries_cloudagent/protocols/issue_credential/v1_0/manager.py index 7934d1725d..f1d0423a9b 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/manager.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/manager.py @@ -4,7 +4,7 @@ import json import logging -from typing import Mapping, Tuple +from typing import Mapping, Optional, Tuple from ....cache.base import BaseCache from ....core.error import BaseError @@ -27,7 +27,9 @@ from ....revocation.util import notify_revocation_reg_event from ....storage.base import BaseStorage from ....storage.error import StorageError, StorageNotFoundError +from ....connections.models.conn_record import ConnRecord +from ...out_of_band.v1_0.models.oob_record import OobRecord from .messages.credential_ack import CredentialAck from .messages.credential_issue import CredentialIssue from .messages.credential_offer import CredentialOffer @@ -325,7 +327,7 @@ async def _create(cred_def_id): return (cred_ex_record, credential_offer_message) async def receive_offer( - self, message: CredentialOffer, connection_id: str + self, message: CredentialOffer, connection_id: Optional[str] ) -> V10CredentialExchange: """ Receive a credential offer. @@ -352,7 +354,11 @@ async def receive_offer( try: cred_ex_record = await ( V10CredentialExchange.retrieve_by_connection_and_thread( - txn, connection_id, message._thread_id, for_update=True + txn, + connection_id, + message._thread_id, + role=V10CredentialExchange.ROLE_HOLDER, + for_update=True, ) ) except StorageNotFoundError: # issuer sent this offer free of any proposal @@ -375,6 +381,7 @@ async def receive_offer( ) cred_ex_record.credential_proposal_dict = credential_proposal_dict + cred_ex_record.credential_offer_dict = message cred_ex_record.credential_offer = indy_offer cred_ex_record.state = V10CredentialExchange.STATE_OFFER_RECEIVED cred_ex_record.schema_id = schema_id @@ -465,7 +472,10 @@ async def _create(): credential_request_message = CredentialRequest( requests_attach=[CredentialRequest.wrap_indy_cred_req(cred_req_ser)] ) - credential_request_message._thread = {"thid": cred_ex_record.thread_id} + # Assign thid (and optionally pthid) to message + credential_request_message.assign_thread_from( + cred_ex_record.credential_offer_dict + ) credential_request_message.assign_trace_decorator( self._profile.settings, cred_ex_record.trace ) @@ -489,7 +499,12 @@ async def _create(): return (cred_ex_record, credential_request_message) - async def receive_request(self, message: CredentialRequest, connection_id: str): + async def receive_request( + self, + message: CredentialRequest, + connection_record: Optional[ConnRecord], + oob_record: Optional[OobRecord], + ): """ Receive a credential request. @@ -503,26 +518,26 @@ async def receive_request(self, message: CredentialRequest, connection_id: str): assert len(message.requests_attach or []) == 1 credential_request = message.indy_cred_req(0) + # connection_id is None in the record if this is in response to + # an request~attach from an OOB message. If so, we do not want to filter + # the record by connection_id. + connection_id = None if oob_record else connection_record.connection_id + async with self._profile.transaction() as txn: try: cred_ex_record = await ( V10CredentialExchange.retrieve_by_connection_and_thread( - txn, connection_id, message._thread_id, for_update=True - ) - ) - except StorageNotFoundError: - try: - cred_ex_record = await V10CredentialExchange.retrieve_by_tag_filter( txn, - {"thread_id": message._thread_id}, - {"connection_id": None}, + connection_id, + message._thread_id, + role=V10CredentialExchange.ROLE_ISSUER, for_update=True, ) - cred_ex_record.connection_id = connection_id - except StorageNotFoundError: - raise CredentialManagerError( - "Indy issue credential format can't start from credential request" - ) from None + ) + except StorageNotFoundError: + raise CredentialManagerError( + "Indy issue credential format can't start from credential request" + ) from None if cred_ex_record.state != V10CredentialExchange.STATE_OFFER_SENT: LOGGER.error( "Skipping credential request; exchange state is %s (id=%s)", @@ -530,6 +545,10 @@ async def receive_request(self, message: CredentialRequest, connection_id: str): cred_ex_record.credential_exchange_id, ) return None + + if connection_record: + cred_ex_record.connection_id = connection_record.connection_id + cred_ex_record.credential_request = credential_request cred_ex_record.state = V10CredentialExchange.STATE_REQUEST_RECEIVED await cred_ex_record.save(txn, reason="receive credential request") @@ -745,7 +764,7 @@ async def issue_credential( return (cred_ex_record, credential_message) async def receive_credential( - self, message: CredentialIssue, connection_id: str + self, message: CredentialIssue, connection_id: Optional[str] ) -> V10CredentialExchange: """ Receive a credential from an issuer. @@ -763,7 +782,11 @@ async def receive_credential( try: cred_ex_record = await ( V10CredentialExchange.retrieve_by_connection_and_thread( - txn, connection_id, message._thread_id, for_update=True + txn, + connection_id, + message._thread_id, + role=V10CredentialExchange.ROLE_HOLDER, + for_update=True, ) ) except StorageNotFoundError: @@ -948,8 +971,8 @@ async def send_credential_ack( return (cred_ex_record, credential_ack_message) async def receive_credential_ack( - self, message: CredentialAck, connection_id: str - ) -> V10CredentialExchange: + self, message: CredentialAck, connection_id: Optional[str] + ) -> Optional[V10CredentialExchange]: """ Receive credential ack from holder. @@ -961,7 +984,11 @@ async def receive_credential_ack( try: cred_ex_record = await ( V10CredentialExchange.retrieve_by_connection_and_thread( - txn, connection_id, message._thread_id, for_update=True + txn, + connection_id, + message._thread_id, + role=V10CredentialExchange.ROLE_ISSUER, + for_update=True, ) ) except StorageNotFoundError: diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py b/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py index 5df1eedd74..a91d5c03b6 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py @@ -2,7 +2,7 @@ import logging -from typing import Any, Mapping, Union +from typing import Any, Mapping, Optional, Union from marshmallow import fields, validate @@ -264,21 +264,27 @@ def record_value(self) -> dict: async def retrieve_by_connection_and_thread( cls, session: ProfileSession, - connection_id: str, + connection_id: Optional[str], thread_id: str, + role: Optional[str] = None, *, for_update=False, ) -> "V10CredentialExchange": """Retrieve a credential exchange record by connection and thread ID.""" - cache_key = f"credential_exchange_ctidx::{connection_id}::{thread_id}" + cache_key = f"credential_exchange_ctidx::{connection_id}::{thread_id}::{role}" record_id = await cls.get_cached_key(session, cache_key) if record_id: record = await cls.retrieve_by_id(session, record_id, for_update=for_update) else: + post_filter = {} + if role: + post_filter["role"] = role + if connection_id: + post_filter["connection_id"] = connection_id record = await cls.retrieve_by_tag_filter( session, {"thread_id": thread_id}, - {"connection_id": connection_id} if connection_id else None, + post_filter, for_update=for_update, ) await cls.set_cached_key(session, cache_key, record.credential_exchange_id) diff --git a/aries_cloudagent/protocols/out_of_band/v1_0/manager.py b/aries_cloudagent/protocols/out_of_band/v1_0/manager.py index 9fd4542c02..12309e2b97 100644 --- a/aries_cloudagent/protocols/out_of_band/v1_0/manager.py +++ b/aries_cloudagent/protocols/out_of_band/v1_0/manager.py @@ -1,56 +1,43 @@ """Classes to manage connections.""" import asyncio -import json import logging +import re +from typing import Mapping, Optional, Sequence +import uuid -from typing import Mapping, Sequence +from aries_cloudagent.core.event_bus import EventBus from ....connections.base_manager import BaseConnectionManager from ....connections.models.conn_record import ConnRecord from ....connections.util import mediation_record_if_id from ....core.error import BaseError +from ....core.oob_processor import OobMessageProcessor from ....core.profile import Profile from ....did.did_key import DIDKey -from ....indy.holder import IndyHolder -from ....indy.models.xform import indy_proof_req_preview2indy_requested_creds from ....messaging.decorators.attach_decorator import AttachDecorator from ....messaging.responder import BaseResponder from ....multitenant.base import BaseMultitenantManager from ....storage.error import StorageNotFoundError from ....transport.inbound.receipt import MessageReceipt from ....wallet.base import BaseWallet -from ....wallet.util import b64_to_bytes from ....wallet.key_type import KeyType - -from ...coordinate_mediation.v1_0.manager import MediationManager from ...connections.v1_0.manager import ConnectionManager from ...connections.v1_0.messages.connection_invitation import ConnectionInvitation +from ...coordinate_mediation.v1_0.manager import MediationManager from ...didcomm_prefix import DIDCommPrefix from ...didexchange.v1_0.manager import DIDXManager -from ...issue_credential.v1_0.manager import CredentialManager as V10CredManager -from ...issue_credential.v1_0.messages.credential_offer import ( - CredentialOffer as V10CredOffer, -) -from ...issue_credential.v1_0.message_types import CREDENTIAL_OFFER from ...issue_credential.v1_0.models.credential_exchange import V10CredentialExchange -from ...issue_credential.v2_0.manager import V20CredManager -from ...issue_credential.v2_0.messages.cred_offer import V20CredOffer -from ...issue_credential.v2_0.message_types import CRED_20_OFFER from ...issue_credential.v2_0.models.cred_ex_record import V20CredExRecord -from ...present_proof.v1_0.manager import PresentationManager -from ...present_proof.v1_0.message_types import PRESENTATION_REQUEST from ...present_proof.v1_0.models.presentation_exchange import V10PresentationExchange -from ...present_proof.v2_0.manager import V20PresManager -from ...present_proof.v2_0.message_types import PRES_20_REQUEST from ...present_proof.v2_0.models.pres_exchange import V20PresExRecord - from .messages.invitation import HSProto, InvitationMessage from .messages.problem_report import OOBProblemReport from .messages.reuse import HandshakeReuse from .messages.reuse_accept import HandshakeReuseAccept from .messages.service import Service as ServiceMessage from .models.invitation import InvitationRecord +from .models.oob_record import OobRecord LOGGER = logging.getLogger(__name__) REUSE_WEBHOOK_TOPIC = "acapy::webhook::connection_reuse" @@ -159,11 +146,20 @@ async def create_invitation( "Cannot store metadata on public invitations" ) + if attachments and multi_use: + raise OutOfBandManagerError( + "Cannot create multi use invitation with attachments" + ) + + invitation_message_id = str(uuid.uuid4()) + message_attachments = [] for atch in attachments or []: a_type = atch.get("type") a_id = atch.get("id") + message = None + if a_type == "credential-offer": try: async with self.profile.session() as session: @@ -171,22 +167,15 @@ async def create_invitation( session, a_id, ) - message_attachments.append( - InvitationMessage.wrap_message( - cred_ex_rec.credential_offer_dict.serialize() - ) - ) + message = cred_ex_rec.credential_offer_dict.serialize() + except StorageNotFoundError: async with self.profile.session() as session: cred_ex_rec = await V20CredExRecord.retrieve_by_id( session, a_id, ) - message_attachments.append( - InvitationMessage.wrap_message( - cred_ex_rec.cred_offer.serialize() - ) - ) + message = cred_ex_rec.cred_offer.serialize() elif a_type == "present-proof": try: async with self.profile.session() as session: @@ -194,25 +183,24 @@ async def create_invitation( session, a_id, ) - message_attachments.append( - InvitationMessage.wrap_message( - pres_ex_rec.presentation_request_dict.serialize() - ) - ) + message = pres_ex_rec.presentation_request_dict.serialize() except StorageNotFoundError: async with self.profile.session() as session: pres_ex_rec = await V20PresExRecord.retrieve_by_id( session, a_id, ) - message_attachments.append( - InvitationMessage.wrap_message( - pres_ex_rec.pres_request.serialize() - ) - ) + message = pres_ex_rec.pres_request.serialize() else: raise OutOfBandManagerError(f"Unknown attachment type: {a_type}") + # Assign pthid to the attached message + message["~thread"] = { + **message.get("~thread", {}), + "pthid": invitation_message_id, + } + message_attachments.append(InvitationMessage.wrap_message(message)) + handshake_protocols = [ DIDCommPrefix.qualify_current(hsp.name) for hsp in hs_protos or [] ] or None @@ -220,6 +208,9 @@ async def create_invitation( hs_protos[0].name if hs_protos and len(hs_protos) >= 1 else None ) + our_recipient_key = None + conn_rec = None + if public: if not self.profile.settings.get("public_invites"): raise OutOfBandManagerError("Public invitations are not enabled") @@ -233,6 +224,7 @@ async def create_invitation( ) invi_msg = InvitationMessage( # create invitation message + _id=invitation_message_id, label=my_label or self.profile.settings.get("default_label"), handshake_protocols=handshake_protocols, requests_attach=message_attachments, @@ -242,66 +234,76 @@ async def create_invitation( public_did.verkey, keylist_updates ) + our_recipient_key = public_did.verkey + endpoint, *_ = await self.resolve_invitation(public_did.did) invi_url = invi_msg.to_url(endpoint) - conn_rec = ConnRecord( # create connection record - invitation_key=public_did.verkey, - invitation_msg_id=invi_msg._id, - their_role=ConnRecord.Role.REQUESTER.rfc23, - state=ConnRecord.State.INVITATION.rfc23, - accept=ConnRecord.ACCEPT_AUTO if accept else ConnRecord.ACCEPT_MANUAL, - alias=alias, - connection_protocol=connection_protocol, - ) - - async with self.profile.session() as session: - await conn_rec.save(session, reason="Created new invitation") - await conn_rec.attach_invitation(session, invi_msg) - if multitenant_mgr and wallet_id: # add mapping for multitenant relay await multitenant_mgr.add_key( wallet_id, public_did.verkey, skip_if_exists=True ) - else: - invitation_mode = ( - ConnRecord.INVITATION_MODE_MULTI - if multi_use - else ConnRecord.INVITATION_MODE_ONCE - ) + # Only create connection record if hanshake_protocols is defined + if handshake_protocols: + conn_rec = ConnRecord( # create connection record + invitation_key=public_did.verkey, + invitation_msg_id=invi_msg._id, + their_role=ConnRecord.Role.REQUESTER.rfc23, + state=ConnRecord.State.INVITATION.rfc23, + accept=ConnRecord.ACCEPT_AUTO + if accept + else ConnRecord.ACCEPT_MANUAL, + alias=alias, + connection_protocol=connection_protocol, + ) + + async with self.profile.session() as session: + await conn_rec.save(session, reason="Created new invitation") + await conn_rec.attach_invitation(session, invi_msg) + else: if not my_endpoint: my_endpoint = self.profile.settings.get("default_endpoint") - # Create and store new invitation key - + # Create and store new key for exchange async with self.profile.session() as session: wallet = session.inject(BaseWallet) connection_key = await wallet.create_signing_key(KeyType.ED25519) keylist_updates = await mediation_mgr.add_key( connection_key.verkey, keylist_updates ) + + our_recipient_key = connection_key.verkey # Add mapping for multitenant relay if multitenant_mgr and wallet_id: await multitenant_mgr.add_key(wallet_id, connection_key.verkey) # Initializing InvitationMessage here to include # invitation_msg_id in webhook poyload - invi_msg = InvitationMessage() - # Create connection record - conn_rec = ConnRecord( - invitation_key=connection_key.verkey, - their_role=ConnRecord.Role.REQUESTER.rfc23, - state=ConnRecord.State.INVITATION.rfc23, - accept=ConnRecord.ACCEPT_AUTO if accept else ConnRecord.ACCEPT_MANUAL, - invitation_mode=invitation_mode, - alias=alias, - connection_protocol=connection_protocol, - invitation_msg_id=invi_msg._id, - ) + invi_msg = InvitationMessage(_id=invitation_message_id) - async with self.profile.session() as session: - await conn_rec.save(session, reason="Created new connection") + if handshake_protocols: + invitation_mode = ( + ConnRecord.INVITATION_MODE_MULTI + if multi_use + else ConnRecord.INVITATION_MODE_ONCE + ) + # Create connection record + conn_rec = ConnRecord( + invitation_key=connection_key.verkey, + their_role=ConnRecord.Role.REQUESTER.rfc23, + state=ConnRecord.State.INVITATION.rfc23, + accept=ConnRecord.ACCEPT_AUTO + if accept + else ConnRecord.ACCEPT_MANUAL, + invitation_mode=invitation_mode, + alias=alias, + connection_protocol=connection_protocol, + invitation_msg_id=invi_msg._id, + ) + + async with self.profile.session() as session: + await conn_rec.save(session, reason="Created new connection") routing_keys = [] # The base wallet can act as a mediator for all tenants @@ -364,16 +366,28 @@ async def create_invitation( invi_url = invi_msg.to_url() # Update connection record + if conn_rec: + async with self.profile.session() as session: + await conn_rec.attach_invitation(session, invi_msg) - async with self.profile.session() as session: - await conn_rec.attach_invitation(session, invi_msg) + if metadata: + for key, value in metadata.items(): + await conn_rec.metadata_set(session, key, value) - if metadata: - async with self.profile.session() as session: - for key, value in metadata.items(): - await conn_rec.metadata_set(session, key, value) + oob_record = OobRecord( + role=OobRecord.ROLE_SENDER, + state=OobRecord.STATE_AWAIT_RESPONSE, + connection_id=conn_rec.connection_id if conn_rec else None, + invi_msg_id=invi_msg._id, + invitation=invi_msg, + our_recipient_key=our_recipient_key, + ) + + async with self.profile.session() as session: + await oob_record.save(session, reason="Created new oob invitation") return InvitationRecord( # for return via admin API, not storage + oob_id=oob_record.oob_id, state=InvitationRecord.STATE_INITIAL, invi_msg_id=invi_msg._id, invitation=invi_msg, @@ -384,10 +398,10 @@ async def receive_invitation( self, invitation: InvitationMessage, use_existing_connection: bool = True, - auto_accept: bool = None, - alias: str = None, - mediation_id: str = None, - ) -> ConnRecord: + auto_accept: Optional[bool] = None, + alias: Optional[str] = None, + mediation_id: Optional[str] = None, + ) -> OobRecord: """ Receive an out of band invitation message. @@ -416,558 +430,371 @@ async def receive_invitation( raise OutOfBandManagerError( "Invitation must specify handshake_protocols, requests_attach, or both" ) + # Get the single service item oob_service_item = invitation.services[0] - if isinstance(oob_service_item, ServiceMessage): - service = oob_service_item - public_did = None - else: - # If it's in the did format, we need to convert to a full service block - # An existing connection can only be reused based on a public DID - # in an out-of-band message (RFC 0434). - service_did = oob_service_item + # Get the DID public did, if any + public_did = None + if isinstance(oob_service_item, str): + public_did = oob_service_item.split(":")[-1] - # TODO: resolve_invitation should resolve key_info objects - # or something else that includes the key type. We now assume - # ED25519 keys - endpoint, recipient_keys, routing_keys = await self.resolve_invitation( - service_did - ) - public_did = service_did.split(":")[-1] - service = ServiceMessage.deserialize( - { - "id": "#inline", - "type": "did-communication", - "recipientKeys": [ - DIDKey.from_public_key_b58(key, KeyType.ED25519).did - for key in recipient_keys - ], - "routingKeys": [ - DIDKey.from_public_key_b58(key, KeyType.ED25519).did - for key in routing_keys - ], - "serviceEndpoint": endpoint, - } - ) + conn_rec = None - unq_handshake_protos = [ - HSProto.get(hsp) - for hsp in dict.fromkeys( - [ - DIDCommPrefix.unqualify(proto) - for proto in invitation.handshake_protocols - ] + # Find existing connection - only if started by an invitation with Public DID + # and use_existing_connection is true + if ( + public_did is not None and use_existing_connection + ): # invite has public DID: seek existing connection + LOGGER.debug( + f"Trying to find existing connection for oob invitation with did {public_did}" ) - ] - - # Reuse Connection - only if started by an invitation with Public DID - conn_rec = None - if public_did is not None: # invite has public DID: seek existing connection async with self._profile.session() as session: conn_rec = await ConnRecord.find_existing_connection( session=session, their_public_did=public_did ) - if conn_rec is not None: - num_included_protocols = len(unq_handshake_protos) - num_included_req_attachments = len(invitation.requests_attach) - # With handshake protocol, request attachment; use existing connection - if ( - num_included_protocols >= 1 - and num_included_req_attachments == 0 - and use_existing_connection - ): - await self.create_handshake_reuse_message( - invi_msg=invitation, - conn_record=conn_rec, - ) - try: - await asyncio.wait_for( - self.check_reuse_msg_state( - conn_rec=conn_rec, - ), - 15, - ) - async with self.profile.session() as session: - await conn_rec.metadata_delete( - session=session, key="reuse_msg_id" - ) - - msg_state = await conn_rec.metadata_get( - session, "reuse_msg_state" - ) - if msg_state == "not_accepted": - conn_rec = None - else: - async with self.profile.session() as session: - await conn_rec.metadata_delete( - session=session, key="reuse_msg_state" - ) - # refetch connection for accurate state after handshake - conn_rec = await ConnRecord.retrieve_by_id( - session=session, record_id=conn_rec.connection_id - ) - except asyncio.TimeoutError: - # If no reuse_accepted or problem_report message was received within - # the 15s timeout then a new connection to be created - async with self.profile.session() as session: - sent_reuse_msg_id = await conn_rec.metadata_get( - session=session, key="reuse_msg_id" - ) - await conn_rec.metadata_delete( - session=session, key="reuse_msg_id" - ) - await conn_rec.metadata_delete( - session=session, key="reuse_msg_state" - ) - conn_rec.state = ConnRecord.State.ABANDONED.rfc160 - await conn_rec.save( - session, reason="No HandshakeReuseAccept message received" - ) - # Emit webhook - await self.profile.notify( - REUSE_ACCEPTED_WEBHOOK_TOPIC, - { - "thread_id": sent_reuse_msg_id, - "connection_id": conn_rec.connection_id, - "state": "rejected", - "comment": ( - "No HandshakeReuseAccept message received, " - f"connection {conn_rec.connection_id} ", - f"and invitation {invitation._id}", - ), - }, - ) - conn_rec = None - # Inverse of the following cases - # Handshake_Protocol not included - # Request_Attachment included - # Use_Existing_Connection Yes - # Handshake_Protocol included - # Request_Attachment included - # Use_Existing_Connection Yes - elif not ( - ( - num_included_protocols == 0 - and num_included_req_attachments >= 1 - and use_existing_connection - ) - or ( - num_included_protocols >= 1 - and num_included_req_attachments >= 1 - and use_existing_connection - ) - ): - conn_rec = None - if conn_rec is None: - if not unq_handshake_protos: - raise OutOfBandManagerError( - "No existing connection exists and handshake_protocol is missing" - ) - # Create a new connection - for proto in unq_handshake_protos: - if proto is HSProto.RFC23: - didx_mgr = DIDXManager(self.profile) - conn_rec = await didx_mgr.receive_invitation( - invitation=invitation, - their_public_did=public_did, - auto_accept=auto_accept, - alias=alias, - mediation_id=mediation_id, - ) - elif proto is HSProto.RFC160: - service.recipient_keys = [ - DIDKey.from_did(key).public_key_b58 - for key in service.recipient_keys or [] - ] - service.routing_keys = [ - DIDKey.from_did(key).public_key_b58 - for key in service.routing_keys - ] or [] - connection_invitation = ConnectionInvitation.deserialize( - { - "@id": invitation._id, - "@type": DIDCommPrefix.qualify_current(proto.name), - "label": invitation.label, - "recipientKeys": service.recipient_keys, - "serviceEndpoint": service.service_endpoint, - "routingKeys": service.routing_keys, - } - ) - conn_mgr = ConnectionManager(self.profile) - conn_rec = await conn_mgr.receive_invitation( - invitation=connection_invitation, - their_public_did=public_did, - auto_accept=auto_accept, - alias=alias, - mediation_id=mediation_id, - ) - if conn_rec is not None: - break - - # Request Attach - if len(invitation.requests_attach) >= 1 and conn_rec is not None: - req_attach = invitation.requests_attach[0] - if isinstance(req_attach, AttachDecorator): - if req_attach.data is not None: - unq_req_attach_type = DIDCommPrefix.unqualify( - req_attach.content["@type"] - ) - if unq_req_attach_type == PRESENTATION_REQUEST: - await self._process_pres_request_v1( - req_attach=req_attach, - service=service, - conn_rec=conn_rec, - trace=(invitation._trace is not None), - ) - elif unq_req_attach_type == PRES_20_REQUEST: - await self._process_pres_request_v2( - req_attach=req_attach, - service=service, - conn_rec=conn_rec, - trace=(invitation._trace is not None), - ) - elif unq_req_attach_type == CREDENTIAL_OFFER: - if auto_accept or self.profile.settings.get( - "debug.auto_accept_invites" - ): - try: - conn_rec = await asyncio.wait_for( - self.conn_rec_is_active(conn_rec.connection_id), - 7, - ) - except asyncio.TimeoutError: - LOGGER.warning( - "Connection not ready to receive credential, " - f"For connection_id:{conn_rec.connection_id} and " - f"invitation_msg_id {invitation._id}", - ) - await self._process_cred_offer_v1( - req_attach=req_attach, - conn_rec=conn_rec, - trace=(invitation._trace is not None), - ) - elif unq_req_attach_type == CRED_20_OFFER: - if auto_accept or self.profile.settings.get( - "debug.auto_accept_invites" - ): - try: - conn_rec = await asyncio.wait_for( - self.conn_rec_is_active(conn_rec.connection_id), - 7, - ) - except asyncio.TimeoutError: - LOGGER.warning( - "Connection not ready to receive credential, " - f"For connection_id:{conn_rec.connection_id} and " - f"invitation_msg_id {invitation._id}", - ) - await self._process_cred_offer_v2( - req_attach=req_attach, - conn_rec=conn_rec, - trace=(invitation._trace is not None), - ) - else: - raise OutOfBandManagerError( - ( - "Unsupported requests~attach type " - f"{req_attach.content['@type']}: must unqualify to" - f"{PRESENTATION_REQUEST} or {PRES_20_REQUEST}" - f"{CREDENTIAL_OFFER} or {CRED_20_OFFER}" - ) - ) - else: - raise OutOfBandManagerError("requests~attach is not properly formatted") + oob_record = OobRecord( + role=OobRecord.ROLE_RECEIVER, + invi_msg_id=invitation._id, + invitation=invitation, + state=OobRecord.STATE_INITIAL, + connection_id=conn_rec.connection_id if conn_rec else None, + ) - return conn_rec + # Save record + async with self.profile.session() as session: + await oob_record.save(session) - async def _process_pres_request_v1( - self, - req_attach: AttachDecorator, - service: ServiceMessage, - conn_rec: ConnRecord, - trace: bool, - ): - """ - Create exchange for v1 pres request attachment, auto-present if configured. + # Try to reuse the connection. If not accepted sets the conn_rec to None + if conn_rec and not invitation.requests_attach: + oob_record = await self._handle_hanshake_reuse(oob_record, conn_rec) + conn_rec = None - Args: - req_attach: request attachment on invitation - service: service message from invitation - conn_rec: connection record - trace: trace setting for presentation exchange record - """ - pres_mgr = PresentationManager(self.profile) - pres_request_msg = req_attach.content - indy_proof_request = json.loads( - b64_to_bytes( - pres_request_msg["request_presentations~attach"][0]["data"]["base64"] + LOGGER.warning( + f"Connection reuse request finished with state {oob_record.state}" ) - ) - oob_invi_service = service.serialize() - pres_request_msg["~service"] = { - "recipientKeys": oob_invi_service.get("recipientKeys"), - "routingKeys": oob_invi_service.get("routingKeys"), - "serviceEndpoint": oob_invi_service.get("serviceEndpoint"), - } - pres_ex_record = V10PresentationExchange( - connection_id=conn_rec.connection_id, - thread_id=pres_request_msg["@id"], - initiator=V10PresentationExchange.INITIATOR_EXTERNAL, - role=V10PresentationExchange.ROLE_PROVER, - presentation_request=indy_proof_request, - presentation_request_dict=pres_request_msg, - auto_present=self.profile.context.settings.get( - "debug.auto_respond_presentation_request" - ), - trace=trace, - ) - pres_ex_record = await pres_mgr.receive_request(pres_ex_record) - if pres_ex_record.auto_present: - try: - async with self.profile.session() as session: - req_creds = await indy_proof_req_preview2indy_requested_creds( - indy_proof_req=indy_proof_request, - preview=None, - holder=session.inject(IndyHolder), - ) - except ValueError as err: - LOGGER.exception( - "Unable to auto-respond to presentation request " - f"{pres_ex_record.presentation_exchange_id}, prover" - " could still build proof manually" - ) - raise OutOfBandManagerError( - "Cannot auto-respond to presentation request attachment" - ) from err - - (pres_ex_record, presentation_message) = await pres_mgr.create_presentation( - presentation_exchange_record=pres_ex_record, - requested_credentials=req_creds, - comment=( - "auto-presented for proof request nonce={}".format( - indy_proof_request["nonce"] - ) - ), + # If reuse is accepted we can return as the oob exchange is complete + # TODO: update the state to DONE + # TODO: Should we remove the oob record if the reuse has been accepted? + if oob_record.state == OobRecord.STATE_ACCEPTED: + return oob_record + + # Try to create a connection. Either if the reuse failed or we didn't have a connection yet + # Throws an error if connection could not be created + # TODO: do we need to wait for the connection to be active? (see below) + if not conn_rec and invitation.handshake_protocols: + oob_record = await self._perform_handshake( + oob_record=oob_record, + alias=alias, + auto_accept=auto_accept, + mediation_id=mediation_id, ) - responder = self.profile.inject_or(BaseResponder) - if responder: - await responder.send( - message=presentation_message, - target_list=await self.fetch_connection_targets( - connection=conn_rec - ), - ) - else: - raise OutOfBandManagerError( - ( - "Configuration sets auto_present false: cannot " - "respond automatically to presentation requests" + LOGGER.debug( + f"Performed handshake with connection {oob_record.connection_id}" + ) + # re-fetch connection record + async with self.profile.session() as session: + conn_rec = await ConnRecord.retrieve_by_id( + session, oob_record.connection_id ) + + # Handle any attachments + if invitation.requests_attach: + LOGGER.debug( + f"Process attached messages for oob exchange {oob_record.oob_id} (connection_id {oob_record.connection_id})" ) + if oob_record.connection_id: + # Wait for connection to become active. + # FIXME: this should ideally be handled using an event handler. Once the connection is ready + # we start processing the attached messages. For now we use the timeout method + # TODO: what if not ready within the timeout? + await self._wait_for_conn_rec_active(oob_record.connection_id) + if not conn_rec: + # Create and store new key for connectionless exchange + async with self.profile.session() as session: + wallet = session.inject(BaseWallet) + connection_key = await wallet.create_signing_key(KeyType.ED25519) + oob_record.our_recipient_key = connection_key.verkey + await oob_record.save(session) - async def _process_pres_request_v2( - self, - req_attach: AttachDecorator, - service: ServiceMessage, - conn_rec: ConnRecord, - trace: bool, - ): + await self._respond_request_attach(oob_record) + + # TODO: remove record? not possible with connectionless + oob_record.state = OobRecord.STATE_DONE + + return oob_record + + async def _respond_request_attach(self, oob_record: OobRecord): + invitation = oob_record.invitation + req_attach = invitation.requests_attach[0] + + if not isinstance(req_attach, AttachDecorator): + raise OutOfBandManagerError("requests~attach is not properly formatted") + + message_processor = self.profile.inject(OobMessageProcessor) + + LOGGER.warning("Handle inbound oob message") + + # TODO: should we add somethign to get the outcome of processing the message? + # Success will happen through protocol specific webhooks + await message_processor.handle_message( + self.profile, + req_attach.content, + oob_record=oob_record, + ) + + async def _wait_for_reuse_response( + self, oob_id: str, timeout: int = 15 + ) -> OobRecord: """ - Create exchange for v2 pres request attachment, auto-present if configured. + Wait for reuse response message state. Either by receiving a reuse accepted or problem + report. If no answer is received withing the timeout, the state will be set to reuse_not_acceted Args: - req_attach: request attachment on invitation - service: service message from invitation - conn_rec: connection record - trace: trace setting for presentation exchange record + oob_id: Identifier of the oob record + timeout: The timeout in seconds to wait for the reuse state [default=15] + + Returns: + """ - pres_mgr = V20PresManager(self.profile) - pres_request_msg = req_attach.content - oob_invi_service = service.serialize() - pres_request_msg["~service"] = { - "recipientKeys": oob_invi_service.get("recipientKeys"), - "routingKeys": oob_invi_service.get("routingKeys"), - "serviceEndpoint": oob_invi_service.get("serviceEndpoint"), - } - pres_ex_record = V20PresExRecord( - connection_id=conn_rec.connection_id, - thread_id=pres_request_msg["@id"], - initiator=V20PresExRecord.INITIATOR_EXTERNAL, - role=V20PresExRecord.ROLE_PROVER, - pres_request=pres_request_msg, - auto_present=self.profile.context.settings.get( - "debug.auto_respond_presentation_request" - ), - trace=trace, + OOB_REUSE_RESPONSE_STATE = re.compile( + "^acapy::record::out_of_band::(reuse_accepted|reuse_not_accepted)$" ) - pres_ex_record = await pres_mgr.receive_pres_request(pres_ex_record) - if pres_ex_record.auto_present: - (pres_ex_record, pres_msg) = await pres_mgr.create_pres( - pres_ex_record=pres_ex_record, - comment=( - f"auto-presented for proof requests" - f", pres_ex_record: {pres_ex_record.pres_ex_id}" - ), - ) - responder = self.profile.inject_or(BaseResponder) - if responder: - await responder.send( - message=pres_msg, - target_list=await self.fetch_connection_targets( - connection=conn_rec - ), - ) - else: - raise OutOfBandManagerError( - ( - "Configuration set auto_present false: cannot " - "respond automatically to presentation requests" - ) - ) + async def _wait_for_state() -> OobRecord: + event = self.profile.inject(EventBus) + with event.wait_for_event( + self.profile, + OOB_REUSE_RESPONSE_STATE, + lambda event: event.payload.get("oob_id") == oob_id, + ) as await_event: + # After starting the listener first retrieve the record from storage. + # This rules out the scenario where the record was in the desired state + # Before starting the event listener + async with self.profile.session() as session: + oob_record = await OobRecord.retrieve_by_id(session, oob_id) - async def _process_cred_offer_v1( - self, - req_attach: AttachDecorator, - conn_rec: ConnRecord, - trace: bool, - ): - """ - Create exchange for v1 cred offer attachment, auto-offer if configured. + if oob_record.state in [ + OobRecord.STATE_ACCEPTED, + OobRecord.STATE_NOT_ACCEPTED, + ]: + return oob_record - Args: - req_attach: request attachment on invitation - service: service message from invitation - conn_rec: connection record - """ - cred_mgr = V10CredManager(self.profile) - cred_offer_msg = req_attach.content - cred_offer = V10CredOffer.deserialize(cred_offer_msg) - cred_offer.assign_trace_decorator(self.profile.settings, trace) - # receive credential offer - cred_ex_record = await cred_mgr.receive_offer( - message=cred_offer, connection_id=conn_rec.connection_id + LOGGER.debug(f"Wait for oob {oob_id} to receive reuse accepted mesage") + # Wait for oob_record to have reuse_accepted state + event = await await_event + return OobRecord.deserialize(event.payload) + + try: + oob_record = await asyncio.wait_for( + _wait_for_state(), + timeout, + ) + + return oob_record + except asyncio.TimeoutError: + async with self.profile.session() as session: + oob_record = await OobRecord.retrieve_by_id(session, oob_id) + return oob_record + + async def _wait_for_conn_rec_active( + self, connection_id: str, timeout: int = 7 + ) -> Optional[ConnRecord]: + CONNECTION_READY_EVENT = re.compile( + "^acapy::record::connections::(active|completed|response)$" ) - if self.profile.context.settings.get("debug.auto_respond_credential_offer"): - if conn_rec.is_ready: - (_, cred_request_message) = await cred_mgr.create_request( - cred_ex_record=cred_ex_record, - holder_did=conn_rec.my_did, - ) - responder = self.profile.inject_or(BaseResponder) - if responder: - await responder.send( - message=cred_request_message, - target_list=await self.fetch_connection_targets( - connection=conn_rec - ), + + LOGGER.debug(f"Wait for connection {connection_id} to become active") + + async def _wait_for_state() -> ConnRecord: + event = self.profile.inject(EventBus) + with event.wait_for_event( + self.profile, + CONNECTION_READY_EVENT, + lambda event: event.payload.get("connection_id") == connection_id, + ) as await_event: + # After starting the listener first retrieve the record from storage. + # This rules out the scenario where the record was in the desired state + # Before starting the event listener + async with self.profile.session() as session: + conn_record = await ConnRecord.retrieve_by_id( + session, connection_id ) - else: - raise OutOfBandManagerError( - ( - "Configuration sets auto_offer false: cannot " - "respond automatically to credential offers" - ) + if conn_record.is_ready: + return conn_record + + LOGGER.debug(f"Wait for connection {connection_id} to become active") + # Wait for connection record to be in state + event = await await_event + return ConnRecord.deserialize(event.payload) + + try: + return await asyncio.wait_for( + _wait_for_state(), + timeout, ) - async def _process_cred_offer_v2( - self, - req_attach: AttachDecorator, - conn_rec: ConnRecord, - trace: bool, - ): - """ - Create exchange for v1 cred offer attachment, auto-offer if configured. + except asyncio.TimeoutError: + LOGGER.warning(f"Connection for connection_id {connection_id} not ready") + return None - Args: - req_attach: request attachment on invitation - service: service message from invitation - conn_rec: connection record - """ - cred_mgr = V20CredManager(self.profile) - cred_offer_msg = req_attach.content - cred_offer = V20CredOffer.deserialize(cred_offer_msg) - cred_offer.assign_trace_decorator(self.profile.settings, trace) + async def _handle_hanshake_reuse( + self, oob_record: OobRecord, conn_record: ConnRecord + ) -> OobRecord: + # Send handshake reuse + oob_record = await self._create_handshake_reuse_message(oob_record, conn_record) - cred_ex_record = await cred_mgr.receive_offer( - cred_offer_message=cred_offer, connection_id=conn_rec.connection_id + # Wait for the reuse accepted message + oob_record = await self._wait_for_reuse_response(oob_record.oob_id) + LOGGER.debug( + f"Oob reuse for oob id {oob_record.oob_id} with connection {oob_record.connection_id} finished with state {oob_record.state}" ) - if self.profile.context.settings.get("debug.auto_respond_credential_offer"): - if conn_rec.is_ready: - (_, cred_request_message) = await cred_mgr.create_request( - cred_ex_record=cred_ex_record, - holder_did=conn_rec.my_did, - ) - responder = self.profile.inject_or(BaseResponder) - if responder: - await responder.send( - message=cred_request_message, - target_list=await self.fetch_connection_targets( - connection=conn_rec - ), - ) - else: - raise OutOfBandManagerError( - ( - "Configuration sets auto_offer false: cannot " - "respond automatically to credential offers" - ) + + if oob_record.state != OobRecord.STATE_ACCEPTED: + # Remove associated connection id as reuse has ben denied + oob_record.connection_id = None + + # TODO: replace webhook event with new oob webhook event + # Emit webhook if the reuse was not accepted + await self.profile.notify( + REUSE_ACCEPTED_WEBHOOK_TOPIC, + { + "thread_id": oob_record.reuse_msg_id, + "connection_id": conn_record.connection_id, + "state": "rejected", + "comment": ( + "No HandshakeReuseAccept message received, " + f"connection {conn_record.connection_id} ", + f"and invitation {oob_record.invitation._id}", + ), + }, ) - async def check_reuse_msg_state( + async with self.profile.session() as session: + await oob_record.save(session) + + return oob_record + + async def _perform_handshake( self, - conn_rec: ConnRecord, - ): - """ - Check reuse message state from the ConnRecord Metadata. + *, + oob_record: OobRecord, + alias: Optional[str] = None, + auto_accept: Optional[bool] = None, + mediation_id: Optional[str] = None, + ) -> OobRecord: + invitation = oob_record.invitation + + supported_handshake_protocols = [ + HSProto.get(hsp) + for hsp in dict.fromkeys( + [ + DIDCommPrefix.unqualify(proto) + for proto in invitation.handshake_protocols + ] + ) + ] - Args: - conn_rec: The required ConnRecord with updated metadata + # Get the single service item + service = invitation.services[0] + public_did = None + if isinstance(service, str): + # If it's in the did format, we need to convert to a full service block + # An existing connection can only be reused based on a public DID + # in an out-of-band message (RFC 0434). - Returns: + public_did = service.split(":")[-1] - """ - received = False - while not received: - msg_state = None - async with self.profile.session() as session: - msg_state = await conn_rec.metadata_get(session, "reuse_msg_state") - if msg_state != "initial": - received = True - return + # TODO: resolve_invitation should resolve key_info objects + # or something else that includes the key type. We now assume + # ED25519 keys + endpoint, recipient_keys, routing_keys = await self.resolve_invitation( + service + ) + service = ServiceMessage.deserialize( + { + "id": "#inline", + "type": "did-communication", + "recipientKeys": [ + DIDKey.from_public_key_b58(key, KeyType.ED25519).did + for key in recipient_keys + ], + "routingKeys": [ + DIDKey.from_public_key_b58(key, KeyType.ED25519).did + for key in routing_keys + ], + "serviceEndpoint": endpoint, + } + ) - async def conn_rec_is_active(self, conn_rec_id: str) -> ConnRecord: - """ - Return when ConnRecord state becomes active. + LOGGER.debug(f"Creating connection with public did {public_did}") + + conn_record = None + for protocol in supported_handshake_protocols: + # DIDExchange + if protocol is HSProto.RFC23: + didx_mgr = DIDXManager(self.profile) + conn_record = await didx_mgr.receive_invitation( + invitation=invitation, + their_public_did=public_did, + auto_accept=auto_accept, + alias=alias, + mediation_id=mediation_id, + ) + break + # 0160 Connection + elif protocol is HSProto.RFC160: + service.recipient_keys = [ + DIDKey.from_did(key).public_key_b58 + for key in service.recipient_keys or [] + ] + service.routing_keys = [ + DIDKey.from_did(key).public_key_b58 for key in service.routing_keys + ] or [] + connection_invitation = ConnectionInvitation.deserialize( + { + "@id": invitation._id, + "@type": DIDCommPrefix.qualify_current(protocol.name), + "label": invitation.label, + "recipientKeys": service.recipient_keys, + "serviceEndpoint": service.service_endpoint, + "routingKeys": service.routing_keys, + } + ) + conn_mgr = ConnectionManager(self.profile) + conn_record = await conn_mgr.receive_invitation( + invitation=connection_invitation, + their_public_did=public_did, + auto_accept=auto_accept, + alias=alias, + mediation_id=mediation_id, + ) + break - Args: - conn_rec: ConnRecord + if not conn_record: + raise OutOfBandManagerError( + f"Unable to create connection. Could not perform handshake using any of the handshake_protocols (supported {supported_handshake_protocols})" + ) - Returns: - ConnRecord + async with self.profile.session() as session: + oob_record.connection_id = conn_record.connection_id + await oob_record.save(session) - """ - while True: - async with self.profile.session() as session: - conn_rec = await ConnRecord.retrieve_by_id(session, conn_rec_id) - if conn_rec.is_ready: - return conn_rec - await asyncio.sleep(0.5) + return oob_record - async def create_handshake_reuse_message( + async def _create_handshake_reuse_message( self, - invi_msg: InvitationMessage, + oob_record: OobRecord, conn_record: ConnRecord, - ) -> None: + ) -> OobRecord: """ Create and Send a Handshake Reuse message under RFC 0434. Args: - invi_msg: OOB Invitation Message - service: Service block extracted from the OOB invitation + oob_record: OOB Record + conn_record: Connection record associated with the oob record Returns: @@ -977,26 +804,26 @@ async def create_handshake_reuse_message( """ try: - pthid = invi_msg._id reuse_msg = HandshakeReuse() - thid = reuse_msg._id - reuse_msg.assign_thread_id(thid=thid, pthid=pthid) + reuse_msg.assign_thread_id(thid=reuse_msg._id, pthid=oob_record.invi_msg_id) + connection_targets = await self.fetch_connection_targets( connection=conn_record ) - responder = self.profile.inject_or(BaseResponder) - if responder: - await responder.send( - message=reuse_msg, - target_list=connection_targets, - ) - async with self.profile.session() as session: - await conn_record.metadata_set( - session=session, key="reuse_msg_id", value=reuse_msg._id - ) - await conn_record.metadata_set( - session=session, key="reuse_msg_state", value="initial" - ) + + responder = self.profile.inject(BaseResponder) + await responder.send( + message=reuse_msg, + target_list=connection_targets, + ) + + async with self.profile.session() as session: + oob_record.reuse_msg_id = reuse_msg._id + oob_record.state = OobRecord.STATE_AWAIT_RESPONSE + await oob_record.save(session, reason="Storing reuse msg data") + + return oob_record + except Exception as err: raise OutOfBandManagerError( f"Error on creating and sending a handshake reuse message: {err}" @@ -1004,11 +831,11 @@ async def create_handshake_reuse_message( async def delete_stale_connection_by_invitation(self, invi_msg_id: str): """Delete unused connections, using existing an active connection instead.""" - tag_filter = {} - post_filter = {} - tag_filter["invitation_msg_id"] = invi_msg_id - post_filter["invitation_mode"] = "once" - post_filter["state"] = "invitation" + tag_filter = { + "invitation_msg_id": invi_msg_id, + } + post_filter = {"invitation_mode": "once", "state": "invitation"} + async with self.profile.session() as session: conn_records = await ConnRecord.query( session, @@ -1042,20 +869,30 @@ async def receive_reuse_message( """ invi_msg_id = reuse_msg._thread.pthid - reuse_msg_id = reuse_msg._thread.thid - responder = self.profile.inject_or(BaseResponder) + reuse_msg_id = reuse_msg._thread_id + reuse_accept_msg = HandshakeReuseAccept() reuse_accept_msg.assign_thread_id(thid=reuse_msg_id, pthid=invi_msg_id) connection_targets = await self.fetch_connection_targets(connection=conn_rec) - if responder: - await responder.send( - message=reuse_accept_msg, - target_list=connection_targets, - ) + + responder = self.profile.inject(BaseResponder) + # Update ConnRecord's invi_msg_id async with self._profile.session() as session: + oob_record = await OobRecord.retrieve_by_tag_filter( + session, + {"invi_msg_id": invi_msg_id}, + {"state": OobRecord.STATE_AWAIT_RESPONSE}, + ) + + oob_record.state = OobRecord.STATE_ACCEPTED + oob_record.reuse_msg_id = reuse_msg_id + oob_record.connection_id = conn_rec.connection_id + await oob_record.save(session) + conn_rec.invitation_msg_id = invi_msg_id await conn_rec.save(session, reason="Assigning new invitation_msg_id") + # Delete the ConnRecord created; re-use existing connection await self.delete_stale_connection_by_invitation(invi_msg_id) # Emit webhook @@ -1065,12 +902,17 @@ async def receive_reuse_message( "thread_id": reuse_msg_id, "connection_id": conn_rec.connection_id, "comment": ( - f"Connection {conn_rec.connection_id} is being reused ", - f"for invitation {invi_msg_id}", + f"Connection {conn_rec.connection_id} is being reused " + f"for invitation {invi_msg_id}" ), }, ) + await responder.send( + message=reuse_accept_msg, + target_list=connection_targets, + ) + async def receive_reuse_accepted_message( self, reuse_accepted_msg: HandshakeReuseAccept, @@ -1080,7 +922,7 @@ async def receive_reuse_accepted_message( """ Receive and process a HandshakeReuseAccept message under RFC 0434. - Process a `HandshakeReuseAccept` message by updating the ConnRecord metadata + Process a `HandshakeReuseAccept` message by updating the OobRecord state to `accepted`. Args: @@ -1096,19 +938,20 @@ async def receive_reuse_accepted_message( """ invi_msg_id = reuse_accepted_msg._thread.pthid thread_reuse_msg_id = reuse_accepted_msg._thread.thid + try: async with self.profile.session() as session: - conn_reuse_msg_id = await conn_record.metadata_get( - session=session, key="reuse_msg_id" - ) - assert thread_reuse_msg_id == conn_reuse_msg_id - await conn_record.metadata_set( - session=session, key="reuse_msg_state", value="accepted" + oob_record = await OobRecord.retrieve_by_tag_filter( + session, + {"invi_msg_id": invi_msg_id, "reuse_msg_id": thread_reuse_msg_id}, ) + + oob_record.state = OobRecord.STATE_ACCEPTED conn_record.invitation_msg_id = invi_msg_id await conn_record.save( session, reason="Assigning new invitation_msg_id" ) + await oob_record.save(session, reason="Reuse accepted") # Emit webhook await self.profile.notify( REUSE_ACCEPTED_WEBHOOK_TOPIC, @@ -1117,8 +960,8 @@ async def receive_reuse_accepted_message( "connection_id": conn_record.connection_id, "state": "accepted", "comment": ( - f"Connection {conn_record.connection_id} is being reused ", - f"for invitation {invi_msg_id}", + f"Connection {conn_record.connection_id} is being reused " + f"for invitation {invi_msg_id}" ), }, ) @@ -1132,8 +975,8 @@ async def receive_reuse_accepted_message( "state": "rejected", "comment": ( "Unable to process HandshakeReuseAccept message, " - f"connection {conn_record.connection_id} ", - f"and invitation {invi_msg_id}", + f"connection {conn_record.connection_id} " + f"and invitation {invi_msg_id}" ), }, ) @@ -1155,7 +998,7 @@ async def receive_problem_report( """ Receive and process a ProblemReport message from the inviter to invitee. - Process a `ProblemReport` message by updating the ConnRecord metadata + Process a `ProblemReport` message by updating the OobRecord state to `not_accepted`. Args: @@ -1169,17 +1012,16 @@ async def receive_problem_report( HandshakeReuseAccept message """ + invi_msg_id = problem_report._thread.pthid + thread_reuse_msg_id = problem_report._thread.thid try: - invi_msg_id = problem_report._thread.pthid - thread_reuse_msg_id = problem_report._thread.thid async with self.profile.session() as session: - conn_reuse_msg_id = await conn_record.metadata_get( - session=session, key="reuse_msg_id" - ) - assert thread_reuse_msg_id == conn_reuse_msg_id - await conn_record.metadata_set( - session=session, key="reuse_msg_state", value="not_accepted" + oob_record = await OobRecord.retrieve_by_tag_filter( + session, + {"invi_msg_id": invi_msg_id, "reuse_msg_id": thread_reuse_msg_id}, ) + oob_record.state = OobRecord.STATE_NOT_ACCEPTED + await oob_record.save(session) except Exception as e: raise OutOfBandManagerError( ( diff --git a/aries_cloudagent/protocols/out_of_band/v1_0/models/invitation.py b/aries_cloudagent/protocols/out_of_band/v1_0/models/invitation.py index 513fceb9c1..062466f05c 100644 --- a/aries_cloudagent/protocols/out_of_band/v1_0/models/invitation.py +++ b/aries_cloudagent/protocols/out_of_band/v1_0/models/invitation.py @@ -35,6 +35,7 @@ def __init__( invi_msg_id: str = None, invitation: Union[InvitationMessage, Mapping] = None, # invitation message invitation_url: str = None, + oob_id: str = None, public_did: str = None, # backward-compat: BaseRecord.from_storage() trace: bool = False, **kwargs, @@ -46,6 +47,7 @@ def __init__( self.invi_msg_id = invi_msg_id self._invitation = InvitationMessage.serde(invitation) self.invitation_url = invitation_url + self.oob_id = oob_id self.trace = trace @property @@ -69,11 +71,7 @@ def record_value(self) -> dict: return { **{ prop: getattr(self, prop) - for prop in ( - "invitation_url", - "state", - "trace", - ) + for prop in ("invitation_url", "state", "trace", "oob_id") }, **{ prop: getattr(self, f"_{prop}").ser @@ -110,6 +108,11 @@ class Meta: description="Invitation message identifier", example=UUIDFour.EXAMPLE, ) + oob_id = fields.Str( + required=False, + description="Out of band record identifier", + example=UUIDFour.EXAMPLE, + ) invitation = fields.Nested( InvitationMessageSchema(), required=False, diff --git a/aries_cloudagent/protocols/out_of_band/v1_0/models/oob_record.py b/aries_cloudagent/protocols/out_of_band/v1_0/models/oob_record.py new file mode 100644 index 0000000000..8110a4dea9 --- /dev/null +++ b/aries_cloudagent/protocols/out_of_band/v1_0/models/oob_record.py @@ -0,0 +1,172 @@ +"""Record for out of band invitations.""" + +from typing import Any, Mapping, Optional, Union + +from marshmallow import fields + +from aries_cloudagent.messaging.decorators.service_decorator import ( + ServiceDecorator, + ServiceDecoratorSchema, +) + +from .....messaging.models.base_record import BaseExchangeRecord, BaseExchangeSchema +from .....messaging.valid import UUIDFour + +from ..messages.invitation import InvitationMessage, InvitationMessageSchema + + +class OobRecord(BaseExchangeRecord): + """Represents an out of band record.""" + + class Meta: + """OobRecord metadata.""" + + schema_class = "OobRecordSchema" + + RECORD_TYPE = "oob_record" + RECORD_ID_NAME = "oob_id" + RECORD_TOPIC = "oob_record" + TAG_NAMES = { + "invi_msg_id", + "attach_thread_id", + "our_recipient_key", + "connection_id", + "reuse_msg_id", + } + + STATE_INITIAL = "initial" + STATE_AWAIT_RESPONSE = "await_response" + STATE_NOT_ACCEPTED = "reuse_not_accepted" + STATE_ACCEPTED = "reuse_accepted" + STATE_DONE = "done" + + ROLE_SENDER = "sender" + ROLE_RECEIVER = "receiver" + + def __init__( + self, + *, + state: str, + invi_msg_id: str, + role: str, + invitation: Union[InvitationMessage, Mapping[str, Any]], + their_service: Optional[ServiceDecorator] = None, + connection_id: Optional[str] = None, + reuse_msg_id: Optional[str] = None, + oob_id: Optional[str] = None, + attach_thread_id: Optional[str] = None, + our_recipient_key: Optional[str] = None, + trace: bool = False, + **kwargs, + ): + """Initialize a new OobRecord.""" + super().__init__(oob_id, state, trace=trace, **kwargs) + self._id = oob_id + self.state = state + self.invi_msg_id = invi_msg_id + self.role = role + self._invitation = InvitationMessage.serde(invitation) + self.connection_id = connection_id + self.reuse_msg_id = reuse_msg_id + self.their_service = their_service + self.attach_thread_id = attach_thread_id + self.our_recipient_key = our_recipient_key + self.trace = trace + + @property + def oob_id(self) -> str: + """Accessor for the ID associated with this exchange.""" + return self._id + + @property + def invitation(self) -> InvitationMessage: + """Accessor; get deserialized view.""" + return None if self._invitation is None else self._invitation.de + + @invitation.setter + def invitation(self, value): + """Setter; store de/serialized views.""" + self._invitation = InvitationMessage.serde(value) + + @property + def record_value(self) -> dict: + """Accessor for the JSON record value generated for this invitation.""" + return { + **{ + prop: getattr(self, prop) + for prop in ( + "state", + "their_service", + "connection_id", + "role", + ) + }, + **{ + prop: getattr(self, f"_{prop}").ser + for prop in ("invitation",) + if getattr(self, prop) is not None + }, + } + + def __eq__(self, other: Any) -> bool: + """Comparison between records.""" + return super().__eq__(other) + + +class OobRecordSchema(BaseExchangeSchema): + """Schema to allow serialization/deserialization of invitation records.""" + + class Meta: + """OobRecordSchema metadata.""" + + model_class = OobRecord + + oob_id = fields.Str( + required=True, + description="Oob record identifier", + example=UUIDFour.EXAMPLE, + ) + state = fields.Str( + required=True, + description="Out of band message exchange state", + example=OobRecord.STATE_AWAIT_RESPONSE, + ) + invi_msg_id = fields.Str( + required=True, + description="Invitation message identifier", + example=UUIDFour.EXAMPLE, + ) + invitation = fields.Nested( + InvitationMessageSchema(), + required=True, + description="Out of band invitation message", + ) + + their_service = fields.Nested( + ServiceDecoratorSchema(), + required=False, + ) + + connection_id = fields.Str( + description="Connection record identifier", + required=False, + example=UUIDFour.EXAMPLE, + ) + + attach_thread_id = fields.Str( + description="Connection record identifier", + required=False, + example=UUIDFour.EXAMPLE, + ) + + our_recipient_key = fields.Str( + description="Recipient key used for oob invitation", + required=False, + example=UUIDFour.EXAMPLE, + ) + + role = fields.Str( + description="OOB Role", + required=False, + example=OobRecord.ROLE_RECEIVER, + ) diff --git a/aries_cloudagent/protocols/out_of_band/v1_0/routes.py b/aries_cloudagent/protocols/out_of_band/v1_0/routes.py index a4cca26875..f96f859f86 100644 --- a/aries_cloudagent/protocols/out_of_band/v1_0/routes.py +++ b/aries_cloudagent/protocols/out_of_band/v1_0/routes.py @@ -177,7 +177,7 @@ async def invitation_create(request: web.BaseRequest): mediation_id=mediation_id, ) except (StorageNotFoundError, ValidationError, OutOfBandManagerError) as e: - raise web.HTTPBadRequest(reason=str(e)) + raise web.HTTPBadRequest(reason=e.roll_up) return web.json_response(invi_rec.serialize()) diff --git a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_ack_handler.py b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_ack_handler.py index 0a1d01232c..bf00d19580 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_ack_handler.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_ack_handler.py @@ -1,5 +1,6 @@ """Presentation ack message handler.""" +from .....core.oob_processor import OobMessageProcessor from .....messaging.base_handler import BaseHandler, HandlerException from .....messaging.request_context import RequestContext from .....messaging.responder import BaseResponder @@ -29,8 +30,19 @@ async def handle(self, context: RequestContext, responder: BaseResponder): context.message.serialize(as_string=True), ) - if not context.connection_ready: - raise HandlerException("No connection established for presentation ack") + # If connection is present it must be ready for use + if context.connection_record and not context.connection_ready: + raise HandlerException("Connection used for presentation ack not ready") + + # Find associated oob record + oob_processor = context.inject(OobMessageProcessor) + oob_record = await oob_processor.find_oob_record_for_inbound_message(context) + + # Either connection or oob context must be present + if not context.connection_record and not oob_record: + raise HandlerException( + "No connection or associated connectionless exchange found for presentation ack" + ) presentation_manager = PresentationManager(context.profile) await presentation_manager.receive_presentation_ack( diff --git a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_handler.py b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_handler.py index 40c4d73dc6..fdceec89c2 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_handler.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_handler.py @@ -1,7 +1,8 @@ """Presentation message handler.""" +from .....core.oob_processor import OobMessageProcessor from .....ledger.error import LedgerError -from .....messaging.base_handler import BaseHandler +from .....messaging.base_handler import BaseHandler, HandlerException from .....messaging.models.base import BaseModelError from .....messaging.request_context import RequestContext from .....messaging.responder import BaseResponder @@ -35,10 +36,24 @@ async def handle(self, context: RequestContext, responder: BaseResponder): context.message.serialize(as_string=True), ) + # If connection is present it must be ready for use + if context.connection_record and not context.connection_ready: + raise HandlerException("Connection used for presentation not ready") + + # Find associated oob record. If the proof request was created as an oob attachment + # the presentation exchange record won't have a connection id (yet) + oob_processor = context.inject(OobMessageProcessor) + oob_record = await oob_processor.find_oob_record_for_inbound_message(context) + + # Either connection or oob context must be present + if not context.connection_record and not oob_record: + raise HandlerException( + "No connection or associated connectionless exchange found for presentation" + ) presentation_manager = PresentationManager(profile) presentation_exchange_record = await presentation_manager.receive_presentation( - context.message, context.connection_record + context.message, context.connection_record, oob_record ) # mgr saves record state null if need be and possible r_time = trace_event( diff --git a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_problem_report_handler.py b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_problem_report_handler.py index e20d496c01..fb5874145a 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_problem_report_handler.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_problem_report_handler.py @@ -1,6 +1,6 @@ """Presentation problem report message handler.""" -from .....messaging.base_handler import BaseHandler +from .....messaging.base_handler import BaseHandler, HandlerException from .....messaging.request_context import RequestContext from .....messaging.responder import BaseResponder from .....storage.error import StorageError, StorageNotFoundError @@ -26,6 +26,16 @@ async def handle(self, context: RequestContext, responder: BaseResponder): ) assert isinstance(context.message, PresentationProblemReport) + # If connection is present it must be ready for use + if context.connection_record and not context.connection_ready: + raise HandlerException( + "Connection used for presentation problem report not ready" + ) + elif not context.connection_record: + raise HandlerException( + "Connectionless not supported for presentation problem report" + ) + presentation_manager = PresentationManager(context.profile) try: await presentation_manager.receive_problem_report( diff --git a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_proposal_handler.py b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_proposal_handler.py index 89708a4368..7977626453 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_proposal_handler.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_proposal_handler.py @@ -37,9 +37,14 @@ async def handle(self, context: RequestContext, responder: BaseResponder): context.message.serialize(as_string=True), ) - if not context.connection_ready: + if not context.connection_record: raise HandlerException( - "No connection established for presentation proposal" + "Connectionless not supported for presentation proposal" + ) + # If connection is present it must be ready for use + elif not context.connection_ready: + raise HandlerException( + "Connection used for presentation proposal not ready" ) presentation_manager = PresentationManager(profile) diff --git a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_request_handler.py b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_request_handler.py index 21979940ee..581de65553 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_request_handler.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_request_handler.py @@ -1,5 +1,6 @@ """Presentation request message handler.""" +from .....core.oob_processor import OobMessageProcessor from .....indy.holder import IndyHolder, IndyHolderError from .....indy.models.xform import indy_proof_req_preview2indy_requested_creds from .....ledger.error import LedgerError @@ -40,8 +41,25 @@ async def handle(self, context: RequestContext, responder: BaseResponder): context.message.serialize(as_string=True), ) - if not context.connection_ready: - raise HandlerException("No connection established for presentation request") + # If connection is present it must be ready for use + if context.connection_record and not context.connection_ready: + raise HandlerException("Connection used for presentation request not ready") + + # Find associated oob record + oob_processor = context.inject(OobMessageProcessor) + oob_record = await oob_processor.find_oob_record_for_inbound_message(context) + + # Either connection or oob context must be present + if not context.connection_record and not oob_record: + raise HandlerException( + "No connection or associated connectionless exchange found for presentation request" + ) + + connection_id = ( + context.connection_record.connection_id + if context.connection_record + else None + ) presentation_manager = PresentationManager(profile) @@ -56,11 +74,14 @@ async def handle(self, context: RequestContext, responder: BaseResponder): ) = await V10PresentationExchange.retrieve_by_tag_filter( session, {"thread_id": context.message._thread_id}, - {"connection_id": context.connection_record.connection_id}, + { + "role": V10PresentationExchange.ROLE_PROVER, + "connection_id": connection_id, + }, ) # holder initiated via proposal except StorageNotFoundError: # verifier sent this request free of any proposal presentation_exchange_record = V10PresentationExchange( - connection_id=context.connection_record.connection_id, + connection_id=connection_id, thread_id=context.message._thread_id, initiator=V10PresentationExchange.INITIATOR_EXTERNAL, role=V10PresentationExchange.ROLE_PROVER, diff --git a/aries_cloudagent/protocols/present_proof/v1_0/manager.py b/aries_cloudagent/protocols/present_proof/v1_0/manager.py index 10354d3d29..5b5cf4a909 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/manager.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/manager.py @@ -2,14 +2,15 @@ import json import logging +from typing import Optional +from ...out_of_band.v1_0.models.oob_record import OobRecord from ....connections.models.conn_record import ConnRecord from ....core.error import BaseError from ....core.profile import Profile from ....indy.verifier import IndyVerifier from ....messaging.decorators.attach_decorator import AttachDecorator from ....messaging.responder import BaseResponder -from ....storage.error import StorageNotFoundError from ..indy.pres_exch_handler import IndyPresExchHandler @@ -276,7 +277,10 @@ async def create_presentation( ], ) - presentation_message._thread = {"thid": presentation_exchange_record.thread_id} + # Assign thid (and optionally pthid) to message + presentation_message.assign_thread_from( + presentation_exchange_record.presentation_request_dict + ) presentation_message.assign_trace_decorator( self._profile.settings, presentation_exchange_record.trace ) @@ -294,7 +298,10 @@ async def create_presentation( return presentation_exchange_record, presentation_message async def receive_presentation( - self, message: Presentation, connection_record: ConnRecord + self, + message: Presentation, + connection_record: Optional[ConnRecord], + oob_record: Optional[OobRecord], ): """ Receive a presentation, from message in context on manager creation. @@ -306,25 +313,27 @@ async def receive_presentation( presentation = message.indy_proof() thread_id = message._thread_id - connection_id_filter = ( - {"connection_id": connection_record.connection_id} - if connection_record is not None - else None - ) + # connection_id is None in the record if this is in response to + # a request~attach from an OOB message. If so, we do not want to filter + # the record by connection_id. + connection_id = None if oob_record else connection_record.connection_id + async with self._profile.session() as session: - try: - ( - presentation_exchange_record - ) = await V10PresentationExchange.retrieve_by_tag_filter( - session, {"thread_id": thread_id}, connection_id_filter - ) - except StorageNotFoundError: - # Proof Request not bound to any connection: requests_attach in OOB msg - ( - presentation_exchange_record - ) = await V10PresentationExchange.retrieve_by_tag_filter( - session, {"thread_id": thread_id}, None + # Find by thread_id and role. Verify connection id later + presentation_exchange_record = ( + await V10PresentationExchange.retrieve_by_tag_filter( + session, + {"thread_id": thread_id}, + { + "role": V10PresentationExchange.ROLE_VERIFIER, + "connection_id": connection_id, + }, ) + ) + + # Save connection id (if it wasn't already present) + if connection_record: + presentation_exchange_record.connection_id = connection_record.connection_id # Check for bait-and-switch in presented attribute values vs. proposal if presentation_exchange_record.presentation_proposal_dict: @@ -442,6 +451,7 @@ async def send_presentation_ack( await responder.send_reply( presentation_ack_message, + # connection_id can be none in case of connectionless connection_id=presentation_exchange_record.connection_id, ) else: @@ -451,7 +461,7 @@ async def send_presentation_ack( ) async def receive_presentation_ack( - self, message: PresentationAck, connection_record: ConnRecord + self, message: PresentationAck, connection_record: Optional[ConnRecord] ): """ Receive a presentation ack, from message in context on manager creation. @@ -460,13 +470,19 @@ async def receive_presentation_ack( presentation exchange record, retrieved and updated """ + connection_id = connection_record.connection_id if connection_record else None + async with self._profile.session() as session: ( presentation_exchange_record ) = await V10PresentationExchange.retrieve_by_tag_filter( session, {"thread_id": message._thread_id}, - {"connection_id": connection_record.connection_id}, + { + # connection_id can be null in connectionless + "connection_id": connection_id, + "role": V10PresentationExchange.ROLE_PROVER, + }, ) presentation_exchange_record.state = ( diff --git a/aries_cloudagent/protocols/present_proof/v1_0/routes.py b/aries_cloudagent/protocols/present_proof/v1_0/routes.py index ddc90d4ef3..985ce700db 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/routes.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/routes.py @@ -496,8 +496,6 @@ async def presentation_exchange_create_request(request: web.BaseRequest): # other party does not care about our false protocol start raise web.HTTPBadRequest(reason=err.roll_up) - await outbound_handler(presentation_request_message, connection_id=None) - trace_event( context.settings, presentation_request_message, diff --git a/aries_cloudagent/transport/inbound/receipt.py b/aries_cloudagent/transport/inbound/receipt.py index 653bbc37d0..f36412458e 100644 --- a/aries_cloudagent/transport/inbound/receipt.py +++ b/aries_cloudagent/transport/inbound/receipt.py @@ -1,6 +1,7 @@ """Classes for representing message receipt details.""" from datetime import datetime +from typing import Optional class MessageReceipt: @@ -25,6 +26,7 @@ def __init__( sender_did: str = None, sender_verkey: str = None, thread_id: str = None, + parent_thread_id: str = None, ): """Initialize the message delivery instance.""" self._connection_id = connection_id @@ -37,6 +39,7 @@ def __init__( self._sender_did = sender_did self._sender_verkey = sender_verkey self._thread_id = thread_id + self._parent_thread_id = parent_thread_id @property def connection_id(self) -> str: @@ -266,6 +269,28 @@ def thread_id(self, thread: str): """ self._thread_id = thread + @property + def parent_thread_id(self) -> Optional[str]: + """ + Accessor for the identifier of the message parent thread. + + Returns: + The delivery parent thread ID + + """ + return self._parent_thread_id + + @parent_thread_id.setter + def parent_thread_id(self, thread: Optional[str]): + """ + Setter for the message parent thread identifier. + + Args: + thread: The new parent thread identifier + + """ + self._parent_thread_id = thread + def __repr__(self) -> str: """ Provide a human readable representation of this object. diff --git a/aries_cloudagent/transport/pack_format.py b/aries_cloudagent/transport/pack_format.py index 2423e47a40..f982748098 100644 --- a/aries_cloudagent/transport/pack_format.py +++ b/aries_cloudagent/transport/pack_format.py @@ -91,6 +91,7 @@ async def parse_message( receipt.thread_id = ( thread_dec and thread_dec.get("thid") or message_dict.get("@id") ) + receipt.parent_thread_id = thread_dec.get("pthid") if thread_dec else None # handle transport decorator transport_dec = message_dict.get("~transport") diff --git a/aries_cloudagent/transport/wire_format.py b/aries_cloudagent/transport/wire_format.py index d03a8eb40a..f70521dd4a 100644 --- a/aries_cloudagent/transport/wire_format.py +++ b/aries_cloudagent/transport/wire_format.py @@ -134,6 +134,7 @@ async def parse_message( receipt.thread_id = ( thread_dec and thread_dec.get("thid") or message_dict.get("@id") ) + receipt.parent_thread_id = thread_dec.get("pthid") if thread_dec else None # handle transport decorator transport_dec = message_dict.get("~transport")