Skip to content

Commit

Permalink
Code to allow endorser to write the transaction to ledger, and code t…
Browse files Browse the repository at this point in the history
…o save record in author's wallet.

Signed-off-by: Harsh Multani <[email protected]>
  • Loading branch information
HarshMultani-AyanWorks committed May 2, 2021
1 parent adf8256 commit d9d90a1
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Transaction acknowledgement message handler."""

from .....messaging.base_handler import (
BaseHandler,
BaseResponder,
HandlerException,
RequestContext,
)

from ..manager import TransactionManager, TransactionManagerError
from ..messages.transaction_acknowledgement import TransactionAcknowledgement


class TransactionAcknowledgementHandler(BaseHandler):
"""Message handler class for Acknowledging transaction."""

async def handle(self, context: RequestContext, responder: BaseResponder):
"""
Handle transaction acknowledgement message.
Args:
context: Request context
responder: Responder callback
"""

self._logger.debug(
f"TransactionAcknowledgementHandler called with context {context}"
)
assert isinstance(context.message, TransactionAcknowledgement)

if not context.connection_ready:
raise HandlerException("No connection established")

profile_session = await context.session()
mgr = TransactionManager(profile_session)
try:
await mgr.receive_transaction_acknowledgement(
context.message, context.connection_record.connection_id
)
except TransactionManagerError:
self._logger.exception("Error receiving transaction acknowledgement")
177 changes: 173 additions & 4 deletions aries_cloudagent/protocols/endorse_transaction/v1_0/manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Class to manage transactions."""

from aiohttp import web
import logging
import uuid
from asyncio import shield
import json
from time import time

from .models.transaction_record import TransactionRecord
from .messages.transaction_request import TransactionRequest
Expand All @@ -11,6 +13,7 @@
from .messages.cancel_transaction import CancelTransaction
from .messages.transaction_resend import TransactionResend
from .messages.transaction_job_to_send import TransactionJobToSend
from .messages.transaction_acknowledgement import TransactionAcknowledgement

from ....connections.models.conn_record import ConnRecord
from ....transport.inbound.receipt import MessageReceipt
Expand All @@ -19,6 +22,16 @@
from ....core.error import BaseError
from ....core.profile import ProfileSession

from ....ledger.base import BaseLedger

from ....indy.issuer import IndyIssuerError
from ....ledger.error import LedgerError

from ....storage.base import StorageRecord
from ....messaging.schemas.util import SCHEMA_SENT_RECORD_TYPE
from ....messaging.credential_definitions.util import CRED_DEF_SENT_RECORD_TYPE
from .transaction_jobs import TransactionJob


class TransactionManagerError(BaseError):
"""Transaction error."""
Expand Down Expand Up @@ -283,7 +296,7 @@ async def complete_transaction(self, transaction: TransactionRecord):
"""
Complete a transaction.
This is the final state after the received ledger transaction
This is the final state where the received ledger transaction
is written to the ledger.
Args:
Expand All @@ -294,11 +307,85 @@ async def complete_transaction(self, transaction: TransactionRecord):
"""

transaction.state = TransactionRecord.STATE_TRANSACTION_COMPLETED
profile_session = await self.session
transaction.state = TransactionRecord.STATE_TRANSACTION_ACKED

async with profile_session.profile.session() as session:
await transaction.save(session, reason="Completed transaction")

connection_id = transaction.connection_id

async with profile_session.profile.session() as session:
connection_record = await ConnRecord.retrieve_by_id(session, connection_id)
jobs = await connection_record.metadata_get(self._session, "transaction_jobs")
if not jobs:
raise TransactionManagerError(
"The transaction related jobs are not set up in "
"connection metadata for this connection record"
)
if "transaction_my_job" not in jobs.keys():
raise TransactionManagerError(
'The "transaction_my_job" is not set in "transaction_jobs"'
" in connection metadata for this connection record"
)
if jobs["transaction_my_job"] == TransactionJob.TRANSACTION_AUTHOR.name:
await self.store_record_in_wallet(transaction)

transaction_acknowledgement_message = TransactionAcknowledgement(
thread_id=transaction._id
)

return transaction, transaction_acknowledgement_message

async def receive_transaction_acknowledgement(
self, response: TransactionAcknowledgement, connection_id: str
):
"""
Update the transaction record after receiving the transaction acknowledgement.
Args:
response: The transaction acknowledgement
connection_id: The connection_id related to this Transaction Record
"""

profile_session = await self.session
async with profile_session.profile.session() as session:
transaction = await TransactionRecord.retrieve_by_connection_and_thread(
session, connection_id, response.thread_id
)

if transaction.state != TransactionRecord.STATE_TRANSACTION_ENDORSED:
raise TransactionManagerError(
"Only an endorsed transaction can be written to the ledger."
)

transaction.state = TransactionRecord.STATE_TRANSACTION_ACKED
async with profile_session.profile.session() as session:
await transaction.save(session, reason="Received a transaction ack")

connection_id = transaction.connection_id

try:
async with profile_session.profile.session() as session:
connection_record = await ConnRecord.retrieve_by_id(
session, connection_id
)
except StorageNotFoundError as err:
raise TransactionManagerError(err.roll_up) from err
jobs = await connection_record.metadata_get(self._session, "transaction_jobs")
if not jobs:
raise TransactionManagerError(
"The transaction related jobs are not set up in "
"connection metadata for this connection record"
)
if "transaction_my_job" not in jobs.keys():
raise TransactionManagerError(
'The "transaction_my_job" is not set in "transaction_jobs"'
" in connection metadata for this connection record"
)
if jobs["transaction_my_job"] == TransactionJob.TRANSACTION_AUTHOR.name:
await self.store_record_in_wallet(transaction)

return transaction

async def create_refuse_response(
Expand Down Expand Up @@ -531,7 +618,7 @@ async def set_transaction_their_job(
self._session, receipt.sender_did, receipt.recipient_did
)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
raise TransactionManagerError(err.roll_up) from err

value = await connection.metadata_get(self._session, "transaction_jobs")
if value:
Expand All @@ -541,3 +628,85 @@ async def set_transaction_their_job(
await connection.metadata_set(
self._session, key="transaction_jobs", value=value
)

async def store_record_in_wallet(self, transaction: TransactionRecord):
"""
Store record in wallet.
Args:
transaction: The transaction from which the schema/cred_def
would be stored in wallet.
"""

ledger_transaction = transaction.messages_attach[0]["data"]["json"]

ledger = self._session.inject(BaseLedger)
if not ledger:
reason = "No ledger available"
if not self._session.context.settings.get_value("wallet.type"):
reason += ": missing wallet-type?"
raise TransactionManagerError(reason)

async with ledger:
try:
ledger_response_json = await shield(
ledger.txn_submit(ledger_transaction, sign=False, taa_accept=False)
)
except (IndyIssuerError, LedgerError) as err:
raise TransactionManagerError(err.roll_up) from err

ledger_response = json.loads(ledger_response_json)

# write the wallet non-secrets record
# TODO refactor this code (duplicated from ledger.indy.py)
if ledger_response["result"]["txn"]["type"] == "101":
# schema transaction
schema_id = ledger_response["result"]["txnMetadata"]["txnId"]
schema_id_parts = schema_id.split(":")
public_did = ledger_response["result"]["txn"]["metadata"]["from"]
schema_tags = {
"schema_id": schema_id,
"schema_issuer_did": public_did,
"schema_name": schema_id_parts[-2],
"schema_version": schema_id_parts[-1],
"epoch": str(int(time())),
}
record = StorageRecord(SCHEMA_SENT_RECORD_TYPE, schema_id, schema_tags)
# TODO refactor this code?
async with ledger:
storage = ledger.get_indy_storage()
await storage.add_record(record)

elif ledger_response["result"]["txn"]["type"] == "102":
# cred def transaction
async with ledger:
try:
schema_seq_no = str(ledger_response["result"]["txn"]["data"]["ref"])
schema_response = await shield(ledger.get_schema(schema_seq_no))
except (IndyIssuerError, LedgerError) as err:
raise TransactionManagerError(err.roll_up) from err

schema_id = schema_response["id"]
schema_id_parts = schema_id.split(":")
public_did = ledger_response["result"]["txn"]["metadata"]["from"]
credential_definition_id = ledger_response["result"]["txnMetadata"]["txnId"]
cred_def_tags = {
"schema_id": schema_id,
"schema_issuer_did": schema_id_parts[0],
"schema_name": schema_id_parts[-2],
"schema_version": schema_id_parts[-1],
"issuer_did": public_did,
"cred_def_id": credential_definition_id,
"epoch": str(int(time())),
}
record = StorageRecord(
CRED_DEF_SENT_RECORD_TYPE, credential_definition_id, cred_def_tags
)
# TODO refactor this code?
async with ledger:
storage = ledger.get_indy_storage()
await storage.add_record(record)

else:
# TODO unknown ledger transaction type, just ignore for now ...
pass
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
CANCEL_TRANSACTION = "transactions/1.0/cancel"
TRANSACTION_RESEND = "transactions/1.0/resend"
TRANSACTION_JOB_TO_SEND = "transactions/1.0/transaction_my_job"
TRANSACTION_ACKNOWLEDGEMENT = "transactions/1.0/ack"
ATTACHED_MESSAGE = "transactions/1.0/message"
PROTOCOL_PACKAGE = "aries_cloudagent.protocols.endorse_transaction.v1_0"

Expand All @@ -36,5 +37,9 @@
TRANSACTION_JOB_TO_SEND: (
f"{PROTOCOL_PACKAGE}.messages.transaction_job_to_send.TransactionJobToSend"
),
TRANSACTION_ACKNOWLEDGEMENT: (
f"{PROTOCOL_PACKAGE}.messages.transaction_acknowledgement"
".TransactionAcknowledgement"
),
}
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Represents a transaction acknowledgement message."""

from marshmallow import EXCLUDE, fields

from .....messaging.valid import UUIDFour
from .....messaging.ack.message import Ack, AckSchema

from ..message_types import TRANSACTION_ACKNOWLEDGEMENT, PROTOCOL_PACKAGE

HANDLER_CLASS = (
f"{PROTOCOL_PACKAGE}.handlers"
".transaction_acknowledgement_handler.TransactionAcknowledgementHandler"
)


class TransactionAcknowledgement(Ack):
"""Class representing a transaction acknowledgement message."""

class Meta:
"""Metadata for a transaction acknowledgement message."""

handler_class = HANDLER_CLASS
message_type = TRANSACTION_ACKNOWLEDGEMENT
schema_class = "TransactionAcknowledgementSchema"

def __init__(
self,
*,
thread_id: str = None,
**kwargs,
):
"""
Initialize a transaction acknowledgement object.
Args:
thread_id: Thread id of transaction record
"""
super().__init__(**kwargs)
self.thread_id = thread_id


class TransactionAcknowledgementSchema(AckSchema):
"""Transaction Acknowledgement schema class."""

class Meta:
"""Transaction Acknowledgement metadata."""

model_class = TransactionAcknowledgement
unknown = EXCLUDE

thread_id = fields.Str(required=True, example=UUIDFour.EXAMPLE)
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class Meta:
STATE_TRANSACTION_RESENT = "transaction_resent"
STATE_TRANSACTION_RESENT_RECEIEVED = "transaction_resent_received"
STATE_TRANSACTION_CANCELLED = "transaction_cancelled"
STATE_TRANSACTION_COMPLETED = "transaction_completed"
STATE_TRANSACTION_ACKED = "transaction_acked"

def __init__(
self,
Expand Down
Loading

0 comments on commit d9d90a1

Please sign in to comment.