Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete unused ConnRecord generated - OOB invitation (use_exising_connection) #1521

Merged
merged 12 commits into from
Dec 1, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ async def handle(self, context: RequestContext, responder: BaseResponder):
profile = context.profile
mgr = OutOfBandManager(profile)
try:
await mgr.receive_reuse_message(context.message, context.message_receipt)
await mgr.receive_reuse_message(
context.message, context.message_receipt, context.connection_record
)
except OutOfBandManagerError as e:
self._logger.exception(f"Error processing Handshake Reuse message, {e}")
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@ async def test_called(self, mock_oob_mgr, request_context):
mock_oob_mgr.return_value.receive_reuse_message = async_mock.CoroutineMock()
request_context.message = HandshakeReuse()
handler = test_module.HandshakeReuseMessageHandler()
request_context.connection_record = ConnRecord()
responder = MockResponder()
await handler.handle(request_context, responder)
mock_oob_mgr.return_value.receive_reuse_message.assert_called_once_with(
request_context.message, request_context.message_receipt
request_context.message,
request_context.message_receipt,
request_context.connection_record,
)

@pytest.mark.asyncio
Expand All @@ -52,11 +55,13 @@ async def test_reuse_accepted(self, mock_oob_mgr, request_context):
mock_oob_mgr.return_value.receive_reuse_message.return_value = reuse_accepted
request_context.message = HandshakeReuse()
handler = test_module.HandshakeReuseMessageHandler()
request_context.connection_record = ConnRecord()
responder = MockResponder()
await handler.handle(request_context, responder)
mock_oob_mgr.return_value.receive_reuse_message.assert_called_once_with(
request_context.message,
request_context.message_receipt,
request_context.connection_record,
)

@pytest.mark.asyncio
Expand All @@ -68,6 +73,7 @@ async def test_exception(self, mock_oob_mgr, request_context):
)
request_context.message = HandshakeReuse()
handler = test_module.HandshakeReuseMessageHandler()
request_context.connection_record = ConnRecord()
responder = MockResponder()
await handler.handle(request_context, responder)
assert mock_oob_mgr.return_value._logger.exception.called_once_("error")
99 changes: 36 additions & 63 deletions aries_cloudagent/protocols/out_of_band/v1_0/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,9 +985,7 @@ async def create_handshake_reuse_message(

"""
try:
# ID of Out-of-Band invitation to use as a pthid
# pthid = invi_msg._id
pthid = conn_record.invitation_msg_id
pthid = invi_msg._id
reuse_msg = HandshakeReuse()
thid = reuse_msg._id
reuse_msg.assign_thread_id(thid=thid, pthid=pthid)
Expand All @@ -1012,10 +1010,25 @@ async def create_handshake_reuse_message(
f"Error on creating and sending a handshake reuse message: {err}"
)

async def delete_stale_connection_by_invitation(self, invi_msg_id: str):
"""Delete unused connections, using existing an active connection instead."""
post_filter = {}
post_filter["invitation_msg_id"] = invi_msg_id
post_filter["invitation_mode"] = "once"
post_filter["state"] = "invitation"
async with self.profile.session() as session:
conn_records = await ConnRecord.query(
session,
post_filter_positive=post_filter,
)
for conn_rec in conn_records:
await conn_rec.delete_record(session)

async def receive_reuse_message(
self,
reuse_msg: HandshakeReuse,
receipt: MessageReceipt,
conn_rec: ConnRecord,
) -> None:
"""
Receive and process a HandshakeReuse message under RFC 0434.
Expand All @@ -1034,67 +1047,23 @@ async def receive_reuse_message(
or the connection does not exists

"""
try:
invi_msg_id = reuse_msg._thread.pthid
reuse_msg_id = reuse_msg._thread.thid
tag_filter = {}
post_filter = {}
# post_filter["state"] = "active"
# tag_filter["their_did"] = receipt.sender_did
post_filter["invitation_msg_id"] = invi_msg_id
conn_record = await self.find_existing_connection(
tag_filter=tag_filter, post_filter=post_filter
)
responder = self.profile.inject_or(BaseResponder)
if conn_record is not None:
# For ConnRecords created using did-exchange
reuse_accept_msg = HandshakeReuseAccept()
reuse_accept_msg.assign_thread_id(thid=reuse_msg_id, pthid=invi_msg_id)
connection_targets = await self.fetch_connection_targets(
connection=conn_record
)
if responder:
await responder.send(
message=reuse_accept_msg,
target_list=connection_targets,
)
# This is not required as now we attaching the invitation_msg_id
# using original invitation [from existing connection]
#
# Delete the ConnRecord created; re-use existing connection
# invi_id_post_filter = {}
# invi_id_post_filter["invitation_msg_id"] = invi_msg_id
# conn_rec_to_delete = await self.find_existing_connection(
# tag_filter={},
# post_filter=invi_id_post_filter,
# )
# if conn_rec_to_delete is not None:
# if conn_record.connection_id != conn_rec_to_delete.connection_id:
# await conn_rec_to_delete.delete_record(session=self._session)
else:
conn_record = await self.find_existing_connection(
tag_filter={"their_did": receipt.sender_did}, post_filter={}
)
# Problem Report is redundant in this case as with no active
# connection, it cannot reach the invitee any way
if conn_record is not None:
# For ConnRecords created using RFC 0160 connections
reuse_accept_msg = HandshakeReuseAccept()
reuse_accept_msg.assign_thread_id(
thid=reuse_msg_id, pthid=invi_msg_id
)
connection_targets = await self.fetch_connection_targets(
connection=conn_record
)
if responder:
await responder.send(
message=reuse_accept_msg,
target_list=connection_targets,
)
except StorageNotFoundError:
raise OutOfBandManagerError(
(f"No existing ConnRecord found for OOB Invitee, {receipt.sender_did}"),
invi_msg_id = reuse_msg._thread.pthid
reuse_msg_id = reuse_msg._thread.thid
responder = self.profile.inject_or(BaseResponder)
reuse_accept_msg = HandshakeReuseAccept()
reuse_accept_msg.assign_thread_id(thid=reuse_msg_id, pthid=invi_msg_id)
connection_targets = await self.fetch_connection_targets(connection=conn_rec)
if responder:
await responder.send(
message=reuse_accept_msg,
target_list=connection_targets,
)
# Update ConnRecord's invi_msg_id
async with self._profile.session() as session:
conn_rec.invitation_msg_id = invi_msg_id
await conn_rec.save(session, reason="Assigning new invitation_msg_id")
# Delete the ConnRecord created; re-use existing connection
await self.delete_stale_connection_by_invitation(invi_msg_id)

async def receive_reuse_accepted_message(
self,
Expand Down Expand Up @@ -1130,6 +1099,10 @@ async def receive_reuse_accepted_message(
await conn_record.metadata_set(
session=session, key="reuse_msg_state", value="accepted"
)
conn_record.invitation_msg_id = invi_msg_id
await conn_record.save(
session, reason="Assigning new invitation_msg_id"
)
except Exception as e:
raise OutOfBandManagerError(
(
Expand Down
75 changes: 35 additions & 40 deletions aries_cloudagent/protocols/out_of_band/v1_0/tests/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from asynctest import mock as async_mock, TestCase as AsyncTestCase
from copy import deepcopy
from datetime import datetime, timezone
from datetime import datetime, timezone, timedelta
from uuid import UUID

from .....connections.models.conn_record import ConnRecord
Expand All @@ -22,7 +22,7 @@
)
from .....messaging.decorators.attach_decorator import AttachDecorator
from .....messaging.responder import BaseResponder, MockResponder
from .....messaging.util import str_to_epoch
from .....messaging.util import str_to_epoch, datetime_now, datetime_to_str
from .....multitenant.base import BaseMultitenantManager
from .....multitenant.manager import MultitenantManager
from .....protocols.coordinate_mediation.v1_0.models.mediation_record import (
Expand Down Expand Up @@ -1567,7 +1567,9 @@ async def test_receive_reuse_message_existing_found(self):
retrieve_invi_rec.return_value = InvitationRecord(
invi_msg_id="test_123"
)
await self.manager.receive_reuse_message(reuse_msg, receipt)
await self.manager.receive_reuse_message(
reuse_msg, receipt, self.test_conn_rec
)
assert (
len(
await ConnRecord.query(
Expand Down Expand Up @@ -1622,44 +1624,11 @@ async def test_receive_reuse_message_existing_not_found(self):
retrieve_invi_rec.return_value = InvitationRecord(
invi_msg_id="test_123"
)
await self.manager.receive_reuse_message(reuse_msg, receipt)
await self.manager.receive_reuse_message(
reuse_msg, receipt, self.test_conn_rec
)
assert len(self.responder.messages) == 0

async def test_receive_reuse_message_storage_not_found(self):
self.profile.context.update_settings({"public_invites": True})
receipt = MessageReceipt(
recipient_did=TestConfig.test_did,
recipient_did_public=False,
sender_did="test_did",
)
reuse_msg = HandshakeReuse()
reuse_msg.assign_thread_id(thid="test_123", pthid="test_123")

with async_mock.patch.object(
DIDXManager, "receive_invitation", autospec=True
) as didx_mgr_receive_invitation, async_mock.patch(
"aries_cloudagent.protocols.out_of_band.v1_0.manager.InvitationMessage",
autospec=True,
) as inv_message_cls, async_mock.patch.object(
OutOfBandManager,
"fetch_connection_targets",
autospec=True,
) as oob_mgr_fetch_conn, async_mock.patch.object(
InvitationRecord,
"retrieve_by_tag_filter",
autospec=True,
) as retrieve_invi_rec, async_mock.patch.object(
OutOfBandManager,
"find_existing_connection",
autospec=True,
) as oob_mgr_find_existing_conn:
oob_mgr_find_existing_conn.side_effect = StorageNotFoundError()
with self.assertRaises(OutOfBandManagerError) as context:
await self.manager.receive_reuse_message(reuse_msg, receipt)
assert "No existing ConnRecord found for OOB Invitee" in str(
context.exception
)

async def test_receive_reuse_message_problem_report_logic(self):
async with self.profile.session() as session:
self.profile.context.update_settings({"public_invites": True})
Expand All @@ -1685,7 +1654,9 @@ async def test_receive_reuse_message_problem_report_logic(self):
recipient_keys=TestConfig.test_verkey,
sender_key=TestConfig.test_verkey,
)
await self.manager.receive_reuse_message(reuse_msg, receipt)
await self.manager.receive_reuse_message(
reuse_msg, receipt, self.test_conn_rec
)

async def test_receive_reuse_accepted(self):
async with self.profile.session() as session:
Expand Down Expand Up @@ -3580,3 +3551,27 @@ async def test_request_attach_cred_offer_v2_check_conn_rec_active_timeout(self):
)
mock_logger_warning.assert_called_once()
assert conn_rec is not None

async def test_delete_stale_connection_by_invitation(self):
current_datetime = datetime_now()
older_datetime = current_datetime - timedelta(hours=4)
records = [
ConnRecord(
my_did=self.test_did,
their_did="FBmi5JLf5g58kDnNXMy4QM",
their_role=ConnRecord.Role.RESPONDER.rfc160,
state=ConnRecord.State.INVITATION.rfc160,
invitation_key="dummy2",
invitation_mode="once",
invitation_msg_id="test123",
updated_at=datetime_to_str(older_datetime),
)
]
with async_mock.patch.object(
ConnRecord, "query", async_mock.CoroutineMock()
) as mock_connrecord_query, async_mock.patch.object(
ConnRecord, "delete_record", async_mock.CoroutineMock()
) as mock_connrecord_delete:
mock_connrecord_query.return_value = records
await self.manager.delete_stale_connection_by_invitation("test123")
mock_connrecord_delete.assert_called_once()