diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_issue_handler.py b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_issue_handler.py index 34a6052b2e..938d2a2a1f 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_issue_handler.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_issue_handler.py @@ -50,7 +50,7 @@ async def handle(self, context: RequestContext, responder: BaseResponder): ) # Automatically move to next state if flag is set - if context.settings.get("debug.auto_store_credential"): + if cred_ex_record and context.settings.get("debug.auto_store_credential"): try: cred_ex_record = await credential_manager.store_credential( cred_ex_record diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_offer_handler.py b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_offer_handler.py index c148f0632e..af1f3844ae 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_offer_handler.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_offer_handler.py @@ -52,7 +52,9 @@ async def handle(self, context: RequestContext, responder: BaseResponder): ) # If auto respond is turned on, automatically reply with credential request - if context.settings.get("debug.auto_respond_credential_offer"): + if cred_ex_record and context.settings.get( + "debug.auto_respond_credential_offer" + ): credential_request_message = None try: ( diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_request_handler.py b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_request_handler.py index 1ceacfd7b8..bc6d028c10 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_request_handler.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/handlers/credential_request_handler.py @@ -52,7 +52,7 @@ async def handle(self, context: RequestContext, responder: BaseResponder): ) # If auto_issue is enabled, respond immediately - if cred_ex_record.auto_issue: + if cred_ex_record and cred_ex_record.auto_issue: if ( cred_ex_record.credential_proposal_dict and cred_ex_record.credential_proposal_dict.credential_proposal diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/manager.py b/aries_cloudagent/protocols/issue_credential/v1_0/manager.py index 0802dc4e99..75fe7895ac 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/manager.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/manager.py @@ -346,35 +346,42 @@ async def receive_offer( cred_def_id=cred_def_id, ) - async with self._profile.session() as session: + async with self._profile.transaction() as txn: # Get credential exchange record (holder sent proposal first) # or create it (issuer sent offer first) try: cred_ex_record = await ( V10CredentialExchange.retrieve_by_connection_and_thread( - session, connection_id, message._thread_id + txn, connection_id, message._thread_id, for_update=True ) ) - cred_ex_record.credential_proposal_dict = credential_proposal_dict except StorageNotFoundError: # issuer sent this offer free of any proposal cred_ex_record = V10CredentialExchange( connection_id=connection_id, thread_id=message._thread_id, initiator=V10CredentialExchange.INITIATOR_EXTERNAL, role=V10CredentialExchange.ROLE_HOLDER, - credential_proposal_dict=credential_proposal_dict, auto_remove=not self._profile.settings.get( "preserve_exchange_records" ), trace=(message._trace is not None), ) + else: + if cred_ex_record.state != V10CredentialExchange.STATE_PROPOSAL_SENT: + raise CredentialManagerError( + f"Credential exchange {cred_ex_record.credential_exchange_id} " + f"in {cred_ex_record.state} state " + f"(must be {V10CredentialExchange.STATE_PROPOSAL_SENT})" + ) + cred_ex_record.credential_proposal_dict = credential_proposal_dict cred_ex_record.credential_offer = indy_offer cred_ex_record.state = V10CredentialExchange.STATE_OFFER_RECEIVED cred_ex_record.schema_id = schema_id cred_ex_record.credential_definition_id = cred_def_id - await cred_ex_record.save(session, reason="receive credential offer") + await cred_ex_record.save(txn, reason="receive credential offer") + await txn.commit() return cred_ex_record @@ -402,6 +409,8 @@ async def create_request( credential_definition_id = cred_ex_record.credential_definition_id cred_offer_ser = cred_ex_record._credential_offer.ser + cred_req_ser = None + cred_req_meta = None async def _create(): ledger_exec_inst = self._profile.inject(IndyLedgerRequestsExecutor) @@ -432,6 +441,8 @@ async def _create(): "create_request called multiple times for v1.0 credential exchange: %s", cred_ex_record.credential_exchange_id, ) + cred_req_ser = cred_ex_record._credential_request.ser + cred_req_meta = cred_ex_record.credential_request_metadata else: nonce = cred_offer_ser["nonce"] cache_key = ( @@ -448,27 +459,33 @@ async def _create(): await entry.set_result(cred_req_result, 3600) if not cred_req_result: cred_req_result = await _create() - - ( - cred_ex_record.credential_request, - cred_ex_record.credential_request_metadata, - ) = (cred_req_result["request"], cred_req_result["metadata"]) + cred_req_ser = cred_req_result["request"] + cred_req_meta = cred_req_result["metadata"] credential_request_message = CredentialRequest( - requests_attach=[ - CredentialRequest.wrap_indy_cred_req( - cred_ex_record._credential_request.ser - ) - ] + requests_attach=[CredentialRequest.wrap_indy_cred_req(cred_req_ser)] ) credential_request_message._thread = {"thid": cred_ex_record.thread_id} credential_request_message.assign_trace_decorator( self._profile.settings, cred_ex_record.trace ) - cred_ex_record.state = V10CredentialExchange.STATE_REQUEST_SENT - async with self._profile.session() as session: - await cred_ex_record.save(session, reason="create credential request") + async with self._profile.transaction() as txn: + cred_ex_record = await V10CredentialExchange.retrieve_by_id( + txn, cred_ex_record.credential_exchange_id, for_update=True + ) + if cred_ex_record.state != V10CredentialExchange.STATE_OFFER_RECEIVED: + raise CredentialManagerError( + f"Credential exchange {cred_ex_record.credential_exchange_id} " + f"in {cred_ex_record.state} state " + f"(must be {V10CredentialExchange.STATE_OFFER_RECEIVED})" + ) + + cred_ex_record.credential_request = cred_req_ser + cred_ex_record.credential_request_metadata = cred_req_meta + cred_ex_record.state = V10CredentialExchange.STATE_REQUEST_SENT + await cred_ex_record.save(txn, reason="create credential request") + await txn.commit() return (cred_ex_record, credential_request_message) @@ -486,28 +503,37 @@ async def receive_request(self, message: CredentialRequest, connection_id: str): assert len(message.requests_attach or []) == 1 credential_request = message.indy_cred_req(0) - async with self._profile.session() as session: + async with self._profile.transaction() as txn: try: cred_ex_record = await ( V10CredentialExchange.retrieve_by_connection_and_thread( - session, connection_id, message._thread_id + txn, connection_id, message._thread_id, for_update=True ) ) except StorageNotFoundError: try: cred_ex_record = await V10CredentialExchange.retrieve_by_tag_filter( - session, + txn, {"thread_id": message._thread_id}, {"connection_id": None}, + for_update=True, ) cred_ex_record.connection_id = connection_id except StorageNotFoundError: raise CredentialManagerError( "Indy issue credential format can't start from credential request" - ) + ) from None + if cred_ex_record.state != V10CredentialExchange.STATE_OFFER_SENT: + LOGGER.error( + "Skipping credential request; exchange state is %s (id=%s)", + cred_ex_record.state, + cred_ex_record.credential_exchange_id, + ) + return None cred_ex_record.credential_request = credential_request cred_ex_record.state = V10CredentialExchange.STATE_REQUEST_RECEIVED - await cred_ex_record.save(session, reason="receive credential request") + await cred_ex_record.save(txn, reason="receive credential request") + await txn.commit() return cred_ex_record @@ -540,6 +566,7 @@ async def issue_credential( schema_id = cred_ex_record.schema_id rev_reg = None + credential_ser = None if cred_ex_record.credential: LOGGER.warning( @@ -547,6 +574,7 @@ async def issue_credential( + "credential exchange record %s - abstaining", cred_ex_record.credential_exchange_id, ) + credential_ser = cred_ex_record._credential.ser else: cred_offer_ser = cred_ex_record._credential_offer.ser cred_req_ser = cred_ex_record._credential_request.ser @@ -623,7 +651,7 @@ async def issue_credential( raise CredentialManagerError( f"Cred def id {cred_ex_record.credential_definition_id} " "has no active revocation registry" - ) + ) from None del revoc credential_values = ( @@ -645,6 +673,7 @@ async def issue_credential( cred_ex_record.revoc_reg_id, tails_path, ) + credential_ser = json.loads(credential_json) # If the rev reg is now full if rev_reg and rev_reg.max_creds == int(cred_ex_record.revocation_id): @@ -687,12 +716,20 @@ async def issue_credential( raise - cred_ex_record.credential = json.loads(credential_json) - - cred_ex_record.state = V10CredentialExchange.STATE_ISSUED - async with self._profile.session() as session: - # FIXME - re-fetch record to check state, apply transactional update - await cred_ex_record.save(session, reason="issue credential") + async with self._profile.transaction() as txn: + cred_ex_record = await V10CredentialExchange.retrieve_by_id( + txn, cred_ex_record.credential_exchange_id, for_update=True + ) + if cred_ex_record.state != V10CredentialExchange.STATE_REQUEST_RECEIVED: + raise CredentialManagerError( + f"Credential exchange {cred_ex_record.credential_exchange_id} " + f"in {cred_ex_record.state} state " + f"(must be {V10CredentialExchange.STATE_REQUEST_RECEIVED})" + ) + cred_ex_record.state = V10CredentialExchange.STATE_ISSUED + cred_ex_record.credential = credential_ser + await cred_ex_record.save(txn, reason="issue credential") + await txn.commit() credential_message = CredentialIssue( comment=comment, @@ -722,20 +759,29 @@ async def receive_credential( assert len(message.credentials_attach or []) == 1 raw_credential = message.indy_credential(0) - # FIXME use transaction, fetch for_update - async with self._profile.session() as session: - cred_ex_record = await ( - V10CredentialExchange.retrieve_by_connection_and_thread( - session, - connection_id, - message._thread_id, + async with self._profile.transaction() as txn: + try: + cred_ex_record = await ( + V10CredentialExchange.retrieve_by_connection_and_thread( + txn, connection_id, message._thread_id, for_update=True + ) + ) + except StorageNotFoundError: + raise CredentialManagerError( + "No credential exchange record found for received credential" + ) from None + if cred_ex_record.state != V10CredentialExchange.STATE_REQUEST_SENT: + raise CredentialManagerError( + f"Credential exchange {cred_ex_record.credential_exchange_id} " + f"in {cred_ex_record.state} state " + f"(must be {V10CredentialExchange.STATE_REQUEST_SENT})" ) - ) - cred_ex_record.raw_credential = raw_credential cred_ex_record.state = V10CredentialExchange.STATE_CREDENTIAL_RECEIVED - await cred_ex_record.save(session, reason="receive credential") + await cred_ex_record.save(txn, reason="receive credential") + await txn.commit() + return cred_ex_record async def store_credential( @@ -753,7 +799,7 @@ async def store_credential( Updated credential exchange record """ - if cred_ex_record.state != (V10CredentialExchange.STATE_CREDENTIAL_RECEIVED): + if cred_ex_record.state != V10CredentialExchange.STATE_CREDENTIAL_RECEIVED: raise CredentialManagerError( f"Credential exchange {cred_ex_record.credential_exchange_id} " f"in {cred_ex_record.state} state " @@ -802,20 +848,29 @@ async def store_credential( rev_reg_def=revoc_reg_def, ) except IndyHolderError as e: - LOGGER.error(f"Error storing credential. {e.error_code}: {e.message}") + LOGGER.error("Error storing credential: %s: %s", e.error_code, e.message) raise e credential_json = await holder.get_credential(credential_id) credential = json.loads(credential_json) - cred_ex_record.credential_id = credential_id - cred_ex_record.credential = credential - cred_ex_record.revoc_reg_id = credential.get("rev_reg_id", None) - cred_ex_record.revocation_id = credential.get("cred_rev_id", None) + async with self._profile.transaction() as txn: + cred_ex_record = await V10CredentialExchange.retrieve_by_id( + txn, cred_ex_record.credential_exchange_id, for_update=True + ) + if cred_ex_record.state != V10CredentialExchange.STATE_CREDENTIAL_RECEIVED: + raise CredentialManagerError( + f"Credential exchange {cred_ex_record.credential_exchange_id} " + f"in {cred_ex_record.state} state " + f"(must be {V10CredentialExchange.STATE_CREDENTIAL_RECEIVED})" + ) - async with self._profile.session() as session: - # FIXME - re-fetch record to check state, apply transactional update - await cred_ex_record.save(session, reason="store credential") + cred_ex_record.credential_id = credential_id + cred_ex_record.credential = credential + cred_ex_record.revoc_reg_id = credential.get("rev_reg_id", None) + cred_ex_record.revocation_id = credential.get("cred_rev_id", None) + await cred_ex_record.save(txn, reason="store credential") + await txn.commit() return cred_ex_record @@ -840,13 +895,36 @@ async def send_credential_ack( self._profile.settings, cred_ex_record.trace ) - cred_ex_record.state = V10CredentialExchange.STATE_ACKED try: - async with self._profile.session() as session: - # FIXME - re-fetch record to check state, apply transactional update - await cred_ex_record.save(session, reason="ack credential") + async with self._profile.transaction() as txn: + try: + cred_ex_record = await V10CredentialExchange.retrieve_by_id( + txn, cred_ex_record.credential_exchange_id, for_update=True + ) + except StorageNotFoundError: + LOGGER.warning( + "Skipping credential exchange ack, record not found: '%s'", + cred_ex_record.credential_exchange_id, + ) + return None + + if ( + cred_ex_record.state + != V10CredentialExchange.STATE_CREDENTIAL_RECEIVED + ): + LOGGER.warning( + "Skipping credential exchange ack, state is '%s' for record '%s'", + cred_ex_record.state, + cred_ex_record.credential_exchange_id, + ) + return None + + cred_ex_record.state = V10CredentialExchange.STATE_ACKED + await cred_ex_record.save(txn, reason="ack credential") + await txn.commit() - if cred_ex_record.auto_remove: + if cred_ex_record.auto_remove: + async with self._profile.session() as session: await cred_ex_record.delete_record(session) # all done: delete except StorageError: @@ -866,7 +944,7 @@ async def send_credential_ack( cred_ex_record.thread_id, ) - return cred_ex_record, credential_ack_message + return credential_ack_message async def receive_credential_ack( self, message: CredentialAck, connection_id: str @@ -878,18 +956,25 @@ async def receive_credential_ack( credential exchange record, retrieved and updated """ - # FIXME use transaction, fetch for_update - async with self._profile.session() as session: - cred_ex_record = await ( - V10CredentialExchange.retrieve_by_connection_and_thread( - session, - connection_id, + async with self._profile.transaction() as txn: + try: + cred_ex_record = await ( + V10CredentialExchange.retrieve_by_connection_and_thread( + txn, connection_id, message._thread_id, for_update=True + ) + ) + except StorageNotFoundError: + LOGGER.warning( + "Skip ack message on credential exchange, record not found %s", message._thread_id, ) - ) + return None + if cred_ex_record.state == V10CredentialExchange.STATE_ACKED: + return None cred_ex_record.state = V10CredentialExchange.STATE_ACKED - await cred_ex_record.save(session, reason="credential acked") + await cred_ex_record.save(txn, reason="credential acked") + await txn.commit() if cred_ex_record.auto_remove: async with self._profile.session() as session: @@ -907,15 +992,19 @@ async def receive_problem_report( credential exchange record, retrieved and updated """ - # FIXME use transaction, fetch for_update - async with self._profile.session() as session: - cred_ex_record = await ( - V10CredentialExchange.retrieve_by_connection_and_thread( - session, - connection_id, + async with self._profile.transaction() as txn: + try: + cred_ex_record = await ( + V10CredentialExchange.retrieve_by_connection_and_thread( + txn, connection_id, message._thread_id, for_update=True + ) + ) + except StorageNotFoundError: + LOGGER.warning( + "Skip problem report on credential exchange, record not found %s", message._thread_id, ) - ) + return None cred_ex_record.state = V10CredentialExchange.STATE_ABANDONED code = message.description.get( @@ -923,6 +1012,7 @@ async def receive_problem_report( ProblemReportReason.ISSUANCE_ABANDONED.value, ) cred_ex_record.error_msg = f"{code}: {message.description.get('en', code)}" - await cred_ex_record.save(session, reason="received problem report") + await cred_ex_record.save(txn, reason="received problem report") + await txn.commit() return cred_ex_record diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py b/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py index 1a32ba2abc..5df1eedd74 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py @@ -262,18 +262,24 @@ def record_value(self) -> dict: @classmethod async def retrieve_by_connection_and_thread( - cls, session: ProfileSession, connection_id: str, thread_id: str + cls, + session: ProfileSession, + connection_id: str, + thread_id: str, + *, + for_update=False, ) -> "V10CredentialExchange": """Retrieve a credential exchange record by connection and thread ID.""" cache_key = f"credential_exchange_ctidx::{connection_id}::{thread_id}" record_id = await cls.get_cached_key(session, cache_key) if record_id: - record = await cls.retrieve_by_id(session, record_id) + record = await cls.retrieve_by_id(session, record_id, for_update=for_update) else: record = await cls.retrieve_by_tag_filter( session, {"thread_id": thread_id}, {"connection_id": connection_id} if connection_id else None, + for_update=for_update, ) await cls.set_cached_key(session, cache_key, record.credential_exchange_id) return record diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/tests/test_manager.py b/aries_cloudagent/protocols/issue_credential/v1_0/tests/test_manager.py index 91a34af9ac..68ad3d7c3b 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/tests/test_manager.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/tests/test_manager.py @@ -55,6 +55,9 @@ async def setUp(self): setattr( self.profile, "session", async_mock.MagicMock(return_value=self.session) ) + setattr( + self.profile, "transaction", async_mock.MagicMock(return_value=self.session) + ) Ledger = async_mock.MagicMock() self.ledger = Ledger() @@ -278,12 +281,14 @@ async def test_create_free_offer(self): credential_proposal=preview, cred_def_id=CRED_DEF_ID, schema_id=None ) - exchange = V10CredentialExchange( + stored_exchange = V10CredentialExchange( credential_exchange_id="dummy-cxid", credential_definition_id=CRED_DEF_ID, role=V10CredentialExchange.ROLE_ISSUER, credential_proposal_dict=proposal.serialize(), + new_with_id=True, ) + await stored_exchange.save(self.session) with async_mock.patch.object( V10CredentialExchange, "save", autospec=True @@ -313,25 +318,27 @@ async def test_create_free_offer(self): await self.session.storage.add_record(cred_def_record) (ret_exchange, ret_offer) = await self.manager.create_offer( - cred_ex_record=exchange, + cred_ex_record=stored_exchange, counter_proposal=None, comment=comment, ) - assert ret_exchange is exchange + assert ret_exchange is stored_exchange save_ex.assert_called_once() issuer.create_credential_offer.assert_called_once_with(CRED_DEF_ID) - assert exchange.credential_exchange_id == ret_exchange._id # cover property - assert exchange.thread_id == ret_offer._thread_id - assert exchange.credential_definition_id == CRED_DEF_ID - assert exchange.role == V10CredentialExchange.ROLE_ISSUER - assert exchange.schema_id == SCHEMA_ID - assert exchange.state == V10CredentialExchange.STATE_OFFER_SENT - assert exchange._credential_offer.ser == INDY_OFFER + assert ( + stored_exchange.credential_exchange_id == ret_exchange._id + ) # cover property + assert stored_exchange.thread_id == ret_offer._thread_id + assert stored_exchange.credential_definition_id == CRED_DEF_ID + assert stored_exchange.role == V10CredentialExchange.ROLE_ISSUER + assert stored_exchange.schema_id == SCHEMA_ID + assert stored_exchange.state == V10CredentialExchange.STATE_OFFER_SENT + assert stored_exchange._credential_offer.ser == INDY_OFFER (ret_exchange, ret_offer) = await self.manager.create_offer( - cred_ex_record=exchange, + cred_ex_record=stored_exchange, counter_proposal=None, comment=comment, ) # once more to cover case where offer is available in cache @@ -352,12 +359,14 @@ async def test_create_free_offer_attr_mismatch(self): credential_proposal=preview, cred_def_id=CRED_DEF_ID, schema_id=None ) - exchange = V10CredentialExchange( + stored_exchange = V10CredentialExchange( credential_exchange_id="dummy-cxid", credential_definition_id=CRED_DEF_ID, role=V10CredentialExchange.ROLE_ISSUER, credential_proposal_dict=proposal.serialize(), + new_with_id=True, ) + await stored_exchange.save(self.session) with async_mock.patch.object( V10CredentialExchange, "save", autospec=True @@ -388,7 +397,7 @@ async def test_create_free_offer_attr_mismatch(self): with self.assertRaises(CredentialManagerError): await self.manager.create_offer( - cred_ex_record=exchange, + cred_ex_record=stored_exchange, counter_proposal=None, comment=comment, ) @@ -407,11 +416,13 @@ async def test_create_bound_offer(self): ) ) proposal = CredentialProposal(credential_proposal=preview) - exchange = V10CredentialExchange( + stored_exchange = V10CredentialExchange( credential_exchange_id="dummy-cxid", credential_proposal_dict=proposal.serialize(), role=V10CredentialExchange.ROLE_ISSUER, + new_with_id=True, ) + await stored_exchange.save(self.session) with async_mock.patch.object( V10CredentialExchange, "save", autospec=True @@ -443,21 +454,21 @@ async def test_create_bound_offer(self): await self.session.storage.add_record(cred_def_record) (ret_exchange, ret_offer) = await self.manager.create_offer( - cred_ex_record=exchange, + cred_ex_record=stored_exchange, counter_proposal=None, comment=comment, ) - assert ret_exchange is exchange + assert ret_exchange is stored_exchange save_ex.assert_called_once() issuer.create_credential_offer.assert_called_once_with(CRED_DEF_ID) - assert exchange.thread_id == ret_offer._thread_id - assert exchange.schema_id == SCHEMA_ID - assert exchange.credential_definition_id == CRED_DEF_ID - assert exchange.role == V10CredentialExchange.ROLE_ISSUER - assert exchange.state == V10CredentialExchange.STATE_OFFER_SENT - assert exchange._credential_offer.ser == INDY_OFFER + assert stored_exchange.thread_id == ret_offer._thread_id + assert stored_exchange.schema_id == SCHEMA_ID + assert stored_exchange.credential_definition_id == CRED_DEF_ID + assert stored_exchange.role == V10CredentialExchange.ROLE_ISSUER + assert stored_exchange.state == V10CredentialExchange.STATE_OFFER_SENT + assert stored_exchange._credential_offer.ser == INDY_OFFER # additionally check that credential preview was passed through assert ret_offer.credential_preview.attributes == preview.attributes @@ -476,11 +487,13 @@ async def test_create_bound_offer_no_cred_def(self): ) ) proposal = CredentialProposal(credential_proposal=preview) - exchange = V10CredentialExchange( + stored_exchange = V10CredentialExchange( credential_exchange_id="dummy-cxid", credential_proposal_dict=proposal.serialize(), role=V10CredentialExchange.ROLE_ISSUER, + new_with_id=True, ) + await stored_exchange.save(self.session) with async_mock.patch.object( V10CredentialExchange, "save", autospec=True @@ -498,7 +511,7 @@ async def test_create_bound_offer_no_cred_def(self): with self.assertRaises(CredentialManagerError): await self.manager.create_offer( - cred_ex_record=exchange, + cred_ex_record=stored_exchange, counter_proposal=None, comment=comment, ) @@ -529,9 +542,12 @@ async def test_receive_offer_proposed(self): credential_proposal_dict=proposal.serialize(), initiator=V10CredentialExchange.INITIATOR_EXTERNAL, role=V10CredentialExchange.ROLE_HOLDER, + state=V10CredentialExchange.STATE_PROPOSAL_SENT, schema_id=SCHEMA_ID, thread_id=thread_id, + new_with_id=True, ) + await stored_exchange.save(self.session) with async_mock.patch.object( V10CredentialExchange, "save", autospec=True @@ -604,7 +620,9 @@ async def test_create_request(self): state=V10CredentialExchange.STATE_OFFER_RECEIVED, schema_id=SCHEMA_ID, thread_id=thread_id, + new_with_id=True, ) + await stored_exchange.save(self.session) self.cache = InMemoryCache() self.context.injector.bind_instance(BaseCache, self.cache) @@ -646,10 +664,9 @@ async def test_create_request(self): stored_exchange.state = V10CredentialExchange.STATE_OFFER_RECEIVED stored_exchange.credential_request = INDY_CRED_REQ ( - ret_existing_exchange, + _ret_existing_exchange, ret_existing_request, ) = await self.manager.create_request(stored_exchange, holder_did) - assert ret_existing_exchange == ret_exchange assert ret_existing_request._thread_id == thread_id async def test_create_request_no_cache(self): @@ -667,7 +684,9 @@ async def test_create_request_no_cache(self): state=V10CredentialExchange.STATE_OFFER_RECEIVED, schema_id=SCHEMA_ID, thread_id=thread_id, + new_with_id=True, ) + await stored_exchange.save(self.session) with async_mock.patch.object( V10CredentialExchange, "save", autospec=True @@ -712,7 +731,9 @@ async def test_create_request_bad_state(self): state=V10CredentialExchange.STATE_PROPOSAL_SENT, schema_id=SCHEMA_ID, thread_id=thread_id, + new_with_id=True, ) + await stored_exchange.save(self.session) with self.assertRaises(CredentialManagerError): await self.manager.create_request(stored_exchange, holder_did) @@ -725,7 +746,10 @@ async def test_receive_request(self): connection_id=connection_id, initiator=V10CredentialExchange.INITIATOR_EXTERNAL, role=V10CredentialExchange.ROLE_ISSUER, + state=V10CredentialExchange.STATE_OFFER_SENT, + new_with_id=True, ) + await stored_exchange.save(self.session) request = CredentialRequest( requests_attach=[CredentialRequest.wrap_indy_cred_req(INDY_CRED_REQ)] @@ -741,7 +765,7 @@ async def test_receive_request(self): exchange = await self.manager.receive_request(request, connection_id) retrieve_ex.assert_called_once_with( - self.session, connection_id, request._thread_id + self.session, connection_id, request._thread_id, for_update=True ) save_ex.assert_called_once() @@ -753,7 +777,10 @@ async def test_receive_request_no_connection_cred_request(self): credential_exchange_id="dummy-cxid", initiator=V10CredentialExchange.INITIATOR_EXTERNAL, role=V10CredentialExchange.ROLE_ISSUER, + state=V10CredentialExchange.STATE_OFFER_SENT, + new_with_id=True, ) + await stored_exchange.save(self.session) request = CredentialRequest( requests_attach=[CredentialRequest.wrap_indy_cred_req(INDY_CRED_REQ)] @@ -773,10 +800,13 @@ async def test_receive_request_no_connection_cred_request(self): cx_rec = await self.manager.receive_request(request, "test_conn_id") mock_retrieve.assert_called_once_with( - self.session, "test_conn_id", request._thread_id + self.session, "test_conn_id", request._thread_id, for_update=True ) mock_retrieve_tag_filter.assert_called_once_with( - self.session, {"thread_id": request._thread_id}, {"connection_id": None} + self.session, + {"thread_id": request._thread_id}, + {"connection_id": None}, + for_update=True, ) mock_save.assert_called_once() assert cx_rec.state == V10CredentialExchange.STATE_REQUEST_RECEIVED @@ -788,7 +818,10 @@ async def test_receive_request_no_cred_ex_with_offer_found(self): credential_exchange_id="dummy-cxid", initiator=V10CredentialExchange.INITIATOR_EXTERNAL, role=V10CredentialExchange.ROLE_ISSUER, + state=V10CredentialExchange.STATE_OFFER_SENT, + new_with_id=True, ) + await stored_exchange.save(self.session) request = CredentialRequest( requests_attach=[CredentialRequest.wrap_indy_cred_req(INDY_CRED_REQ)] @@ -809,12 +842,13 @@ async def test_receive_request_no_cred_ex_with_offer_found(self): cx_rec = await self.manager.receive_request(request, "test_conn_id") mock_retrieve.assert_called_once_with( - self.session, "test_conn_id", request._thread_id + self.session, "test_conn_id", request._thread_id, for_update=True ) mock_retrieve_tag_filter.assert_called_once_with( self.session, {"thread_id": request._thread_id}, {"connection_id": None}, + for_update=True, ) async def test_issue_credential(self): @@ -840,7 +874,9 @@ async def test_issue_credential(self): role=V10CredentialExchange.ROLE_ISSUER, state=V10CredentialExchange.STATE_REQUEST_RECEIVED, thread_id=thread_id, + new_with_id=True, ) + await stored_exchange.save(self.session) issuer = async_mock.MagicMock() cred = {"indy": "credential"} @@ -892,6 +928,7 @@ async def test_issue_credential(self): # cover case with existing cred stored_exchange.credential = cred stored_exchange.state = V10CredentialExchange.STATE_REQUEST_RECEIVED + await stored_exchange.save(self.session) ( ret_existing_exchange, ret_existing_cred, @@ -926,7 +963,9 @@ async def test_issue_credential_non_revocable(self): role=V10CredentialExchange.ROLE_ISSUER, state=V10CredentialExchange.STATE_REQUEST_RECEIVED, thread_id=thread_id, + new_with_id=True, ) + await stored_exchange.save(self.session) issuer = async_mock.MagicMock() cred = {"indy": "credential"} @@ -1000,7 +1039,9 @@ async def test_issue_credential_fills_rr(self): state=V10CredentialExchange.STATE_REQUEST_RECEIVED, thread_id=thread_id, revocation_id="1000", + new_with_id=True, ) + await stored_exchange.save(self.session) issuer = async_mock.MagicMock() cred = {"indy": "credential"} @@ -1075,7 +1116,9 @@ async def test_issue_credential_request_bad_state(self): state=V10CredentialExchange.STATE_PROPOSAL_SENT, schema_id=SCHEMA_ID, thread_id=thread_id, + new_with_id=True, ) + await stored_exchange.save(self.session) with self.assertRaises(CredentialManagerError): await self.manager.issue_credential(stored_exchange) @@ -1103,7 +1146,9 @@ async def test_issue_credential_no_active_rr_no_retries(self): role=V10CredentialExchange.ROLE_ISSUER, state=V10CredentialExchange.STATE_REQUEST_RECEIVED, thread_id=thread_id, + new_with_id=True, ) + await stored_exchange.save(self.session) issuer = async_mock.MagicMock() cred = {"indy": "credential"} @@ -1167,7 +1212,9 @@ async def test_issue_credential_no_active_rr_retry(self): role=V10CredentialExchange.ROLE_ISSUER, state=V10CredentialExchange.STATE_REQUEST_RECEIVED, thread_id=thread_id, + new_with_id=True, ) + await stored_exchange.save(self.session) issuer = async_mock.MagicMock() cred = {"indy": "credential"} @@ -1235,7 +1282,9 @@ async def test_issue_credential_rr_full(self): role=V10CredentialExchange.ROLE_ISSUER, state=V10CredentialExchange.STATE_REQUEST_RECEIVED, thread_id=thread_id, + new_with_id=True, ) + await stored_exchange.save(self.session) issuer = async_mock.MagicMock() cred = {"indy": "credential"} @@ -1284,7 +1333,10 @@ async def test_receive_credential(self): connection_id=connection_id, initiator=V10CredentialExchange.INITIATOR_EXTERNAL, role=V10CredentialExchange.ROLE_ISSUER, + state=V10CredentialExchange.STATE_REQUEST_SENT, + new_with_id=True, ) + await stored_exchange.save(self.session) issue = CredentialIssue( credentials_attach=[CredentialIssue.wrap_indy_credential(INDY_CRED)] @@ -1300,7 +1352,7 @@ async def test_receive_credential(self): exchange = await self.manager.receive_credential(issue, connection_id) retrieve_ex.assert_called_once_with( - self.session, connection_id, issue._thread_id + self.session, connection_id, issue._thread_id, for_update=True ) save_ex.assert_called_once() @@ -1337,7 +1389,9 @@ async def test_store_credential(self): state=V10CredentialExchange.STATE_CREDENTIAL_RECEIVED, thread_id=thread_id, auto_remove=True, + new_with_id=True, ) + await stored_exchange.save(self.session) cred_id = "cred-id" holder = async_mock.MagicMock() @@ -1406,7 +1460,9 @@ async def test_store_credential_bad_state(self): role=V10CredentialExchange.ROLE_HOLDER, state=V10CredentialExchange.STATE_OFFER_RECEIVED, thread_id=thread_id, + new_with_id=True, ) + await stored_exchange.save(self.session) cred_id = "cred-id" with self.assertRaises(CredentialManagerError): @@ -1435,7 +1491,9 @@ async def test_store_credential_no_preview(self): role=V10CredentialExchange.ROLE_HOLDER, state=V10CredentialExchange.STATE_CREDENTIAL_RECEIVED, thread_id=thread_id, + new_with_id=True, ) + await stored_exchange.save(self.session) cred_def = async_mock.MagicMock() self.ledger.get_credential_definition = async_mock.CoroutineMock( @@ -1503,7 +1561,9 @@ async def test_store_credential_holder_store_indy_error(self): role=V10CredentialExchange.ROLE_HOLDER, state=V10CredentialExchange.STATE_CREDENTIAL_RECEIVED, thread_id=thread_id, + new_with_id=True, ) + await stored_exchange.save(self.session) cred_def = async_mock.MagicMock() self.ledger.get_credential_definition = async_mock.CoroutineMock( @@ -1535,12 +1595,15 @@ async def test_send_credential_ack(self): credential_exchange_id="dummy-cxid", connection_id=connection_id, initiator=V10CredentialExchange.INITIATOR_SELF, + state=V10CredentialExchange.STATE_CREDENTIAL_RECEIVED, thread_id="thid", parent_thread_id="pthid", role=V10CredentialExchange.ROLE_ISSUER, trace=False, auto_remove=True, + new_with_id=True, ) + await stored_exchange.save(self.session) with async_mock.patch.object( V10CredentialExchange, "save", autospec=True @@ -1552,16 +1615,15 @@ async def test_send_credential_ack(self): test_module.LOGGER, "warning", async_mock.MagicMock() ) as mock_log_warning: mock_delete_ex.side_effect = test_module.StorageError() - (_, ack) = await self.manager.send_credential_ack(stored_exchange) + ack = await self.manager.send_credential_ack(stored_exchange) assert ack._thread mock_log_exception.assert_called_once() # cover exception log-and-continue mock_log_warning.assert_called_once() # no BaseResponder mock_responder = MockResponder() # cover with responder self.context.injector.bind_instance(BaseResponder, mock_responder) - (cx_rec, ack) = await self.manager.send_credential_ack(stored_exchange) + ack = await self.manager.send_credential_ack(stored_exchange) assert ack._thread - assert cx_rec.state == V10CredentialExchange.STATE_ACKED async def test_receive_credential_ack(self): connection_id = "connection-id" @@ -1570,7 +1632,9 @@ async def test_receive_credential_ack(self): connection_id=connection_id, initiator=V10CredentialExchange.INITIATOR_SELF, role=V10CredentialExchange.ROLE_ISSUER, + new_with_id=True, ) + await stored_exchange.save(self.session) ack = CredentialAck() @@ -1587,7 +1651,7 @@ async def test_receive_credential_ack(self): ret_exchange = await self.manager.receive_credential_ack(ack, connection_id) retrieve_ex.assert_called_once_with( - self.session, connection_id, ack._thread_id + self.session, connection_id, ack._thread_id, for_update=True ) save_ex.assert_called_once() @@ -1601,7 +1665,9 @@ async def test_receive_problem_report(self): connection_id=connection_id, initiator=V10CredentialExchange.INITIATOR_SELF, role=V10CredentialExchange.ROLE_ISSUER, + new_with_id=True, ) + await stored_exchange.save(self.session) problem = CredentialProblemReport( description={ "code": test_module.ProblemReportReason.ISSUANCE_ABANDONED.value, @@ -1622,7 +1688,7 @@ async def test_receive_problem_report(self): problem, connection_id ) retrieve_ex.assert_called_once_with( - self.session, connection_id, problem._thread_id + self.session, connection_id, problem._thread_id, for_update=True ) save_ex.assert_called_once() @@ -1630,12 +1696,6 @@ async def test_receive_problem_report(self): async def test_receive_problem_report_x(self): connection_id = "connection-id" - stored_exchange = V10CredentialExchange( - credential_exchange_id="dummy-cxid", - initiator=V10CredentialExchange.INITIATOR_SELF, - role=V10CredentialExchange.ROLE_ISSUER, - state=V10CredentialExchange.STATE_REQUEST_RECEIVED, - ) problem = CredentialProblemReport( description={ "code": test_module.ProblemReportReason.ISSUANCE_ABANDONED.value, @@ -1650,8 +1710,8 @@ async def test_receive_problem_report_x(self): ) as retrieve_ex: retrieve_ex.side_effect = test_module.StorageNotFoundError("No such record") - with self.assertRaises(test_module.StorageNotFoundError): - await self.manager.receive_problem_report(problem, connection_id) + exch = await self.manager.receive_problem_report(problem, connection_id) + assert exch is None async def test_retrieve_records(self): self.cache = InMemoryCache()