Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix revocation accum sync when endorsement txn fails #3547

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
HandlerException,
RequestContext,
)
from .....revocation.util import notify_rev_reg_entry_txn_failed
from .....storage.error import StorageError
from ..manager import TransactionManager, TransactionManagerError
from ..messages.endorsed_transaction_response import EndorsedTransactionResponse
Expand All @@ -30,12 +31,15 @@ async def handle(self, context: RequestContext, responder: BaseResponder):
if not context.connection_ready:
raise HandlerException("No connection established")

# profile_session = await context.session()
async def send_failed_transaction_event(err_msg: str):
await notify_rev_reg_entry_txn_failed(context.profile, err_msg)

mgr = TransactionManager(context.profile)
try:
transaction = await mgr.receive_endorse_response(context.message)
except TransactionManagerError as err:
self._logger.exception("Error receiving endorsed transaction response")
await send_failed_transaction_event(str(err))
raise HandlerException(str(err))

# Automatically write transaction if flag is set
Expand All @@ -52,4 +56,5 @@ async def handle(self, context: RequestContext, responder: BaseResponder):
)
except (StorageError, TransactionManagerError) as err:
self._logger.exception(err)
await send_failed_transaction_event(str(err))
raise HandlerException(str(err))
155 changes: 153 additions & 2 deletions acapy_agent/revocation/manager.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,40 @@
"""Classes to manage credential revocation."""

import asyncio
import json
import logging
from typing import Mapping, NamedTuple, Optional, Sequence, Text, Tuple

from ..cache.base import BaseCache
from ..connections.models.conn_record import ConnRecord
from ..core.error import BaseError
from ..core.profile import Profile
from ..core.profile import Profile, ProfileSession
from ..indy.credx.issuer import CATEGORY_REV_REG
from ..indy.issuer import IndyIssuer
from ..ledger.base import BaseLedger
from ..messaging.responder import BaseResponder
from ..protocols.endorse_transaction.v1_0.manager import (
TransactionManager,
TransactionManagerError,
)
from ..protocols.endorse_transaction.v1_0.util import (
get_endorser_connection_id,
)
from ..protocols.issue_credential.v1_0.models.credential_exchange import (
V10CredentialExchange,
)
from ..protocols.issue_credential.v2_0.models.cred_ex_record import V20CredExRecord
from ..protocols.revocation_notification.v1_0.models.rev_notification_record import (
RevNotificationRecord,
)
from ..storage.error import StorageNotFoundError
from ..storage.error import StorageError, StorageNotFoundError
from .indy import IndyRevocation
from .models.issuer_cred_rev_record import IssuerCredRevRecord
from .models.issuer_rev_reg_record import IssuerRevRegRecord
from .util import notify_pending_cleared_event, notify_revocation_published_event

LOGGER = logging.getLogger(__name__)


class RevocationManagerError(BaseError):
"""Revocation manager error."""
Expand Down Expand Up @@ -498,3 +512,140 @@ async def set_cred_revoked_state(
await txn.commit()
except StorageNotFoundError:
pass

async def _get_endorser_info(self) -> Tuple[Optional[str], Optional[ConnRecord]]:
connection_id = await get_endorser_connection_id(self._profile)

endorser_did = None
async with self._profile.session() as session:
connection_record = await ConnRecord.retrieve_by_id(session, connection_id)
endorser_info = await connection_record.metadata_get(session, "endorser_info")
endorser_did = endorser_info.get("endorser_did")

return endorser_did, connection_record

async def fix_and_publish_from_invalid_accum_err(self, err_msg: str):
"""Fix and publish revocation registry entries from invalid accumulator error."""
cache = self._profile.inject_or(BaseCache)

async def check_retry(accum):
"""Used to manage retries for fixing revocation registry entries."""
retry_value = await cache.get(accum)
if not retry_value:
await cache.set(accum, 5)
else:
if retry_value > 0:
await cache.set(accum, retry_value - 1)
else:
LOGGER.error(
f"Revocation registry entry transaction failed for {accum}"
)

def get_genesis_transactions():
"""Get the genesis transactions needed for fixing broken accum."""
genesis_transactions = self._profile.context.settings.get(
"ledger.genesis_transactions"
)
if not genesis_transactions:
write_ledger = self._profile.context.injector.inject(BaseLedger)
pool = write_ledger.pool
genesis_transactions = pool.genesis_txns
return genesis_transactions

async def sync_accumulator(session: ProfileSession):
"""Sync the local accumulator with the ledger and create recovery txn."""
rev_reg_record = await IssuerRevRegRecord.retrieve_by_id(
session, rev_reg_entry.name
)

# Fix and get the recovery transaction
(
rev_reg_delta,
recovery_txn,
applied_txn,
) = await rev_reg_record.fix_ledger_entry(
self._profile, False, genesis_transactions
)

# Update locally assuming ledger write will succeed
rev_reg = await session.handle.fetch(
CATEGORY_REV_REG,
rev_reg_entry.value_json["revoc_reg_id"],
for_update=True,
)
new_value_json = rev_reg.value_json
new_value_json["value"]["accum"] = recovery_txn["value"]["accum"]
await session.handle.replace(
CATEGORY_REV_REG,
rev_reg.name,
json.dumps(new_value_json),
rev_reg.tags,
)

return rev_reg_record, recovery_txn

async def create_and_send_endorser_txn():
"""Create and send the endorser transaction again."""
async with ledger:
# Create the revocation registry entry
rev_entry_res = await ledger.send_revoc_reg_entry(
rev_reg_entry.value_json["revoc_reg_id"],
"CL_ACCUM",
recovery_txn,
rev_reg_record.issuer_did,
write_ledger=False,
endorser_did=endorser_did,
)

# Send the transaction to the endorser again with recovery txn
transaction_manager = TransactionManager(self._profile)
try:
revo_transaction = await transaction_manager.create_record(
messages_attach=rev_entry_res["result"],
connection_id=connection.connection_id,
)
(
revo_transaction,
revo_transaction_request,
) = await transaction_manager.create_request(transaction=revo_transaction)
except (StorageError, TransactionManagerError) as err:
raise RevocationManagerError(err.roll_up) from err

responder = self._profile.inject_or(BaseResponder)
if not responder:
raise RevocationManagerError(
"No responder found. Unable to send transaction request"
)
await responder.send(
revo_transaction_request,
connection_id=connection.connection_id,
)

async with self._profile.session() as session:
rev_reg_records = await session.handle.fetch_all(
IssuerRevRegRecord.RECORD_TYPE
)
# Cycle through all rev_rev_def records to find the offending accumulator
for rev_reg_entry in rev_reg_records:
ledger = session.inject_or(BaseLedger)
# Get the value from the ledger
async with ledger:
(accum_response, _) = await ledger.get_revoc_reg_delta(
rev_reg_entry.value_json["revoc_reg_id"]
)
accum = accum_response.get("value", {}).get("accum")

# If the accum from the ledger matches the error message, fix it
if accum and accum in err_msg:
await check_retry(accum)

# Get the genesis transactions needed for fix
genesis_transactions = get_genesis_transactions()

# We know this needs endorsement
endorser_did, connection = await self._get_endorser_info()
rev_reg_record, recovery_txn = await sync_accumulator(session=session)
await create_and_send_endorser_txn()
Comment on lines +645 to +648
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an endorser is not required (i.e.: the agent is set-up with higher privileges than author), how is this handled? In our scenarios we will likely only ever use an endorser to get transactions approved to be written on the ledger, but I think it may not be the only scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it would go through the existing re-sync and re-try flow. Also the manual fix-revocation-state could be applied for no endorser, although it shouldn't be needed. https://github.com/openwallet-foundation/acapy/blob/main/acapy_agent/revocation/models/issuer_rev_reg_record.py#L336

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the endorser_did isn't configured then aca-py should try to write the ledger transaction directly.


# Some time in between re-tries
await asyncio.sleep(1)
10 changes: 10 additions & 0 deletions acapy_agent/revocation/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1557,6 +1557,10 @@ def register_events(event_bus: EventBus):
re.compile(f"^{REVOCATION_EVENT_PREFIX}{REVOCATION_ENTRY_EVENT}.*"),
on_revocation_entry_event,
)
event_bus.subscribe(
re.compile(f"^{REVOCATION_EVENT_PREFIX}REV_REG_ENTRY_TXN_FAILED.*"),
on_rev_reg_entry_txn_failed,
)


async def on_revocation_registry_init_event(profile: Profile, event: Event):
Expand Down Expand Up @@ -1747,6 +1751,12 @@ async def on_revocation_registry_endorsed_event(profile: Profile, event: Event):
)


async def on_rev_reg_entry_txn_failed(profile: Profile, event: Event):
"""Handle revocation registry entry transaction failed event."""
manager = RevocationManager(profile)
await manager.fix_and_publish_from_invalid_accum_err(event.payload.get("msg"))


class TailsDeleteResponseSchema(OpenAPISchema):
"""Return schema for tails deletion."""

Expand Down
99 changes: 99 additions & 0 deletions acapy_agent/revocation/tests/test_manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import json
from unittest import IsolatedAsyncioTestCase

from uuid_utils import uuid4

from ...cache.base import BaseCache
from ...cache.in_memory import InMemoryCache
from ...connections.models.conn_record import ConnRecord
from ...indy.credx.issuer import CATEGORY_REV_REG
from ...indy.issuer import IndyIssuer
from ...ledger.base import BaseLedger
from ...messaging.responder import BaseResponder
from ...protocols.issue_credential.v1_0.models.credential_exchange import (
V10CredentialExchange,
)
Expand All @@ -14,6 +21,7 @@
from ...utils.testing import create_test_profile
from .. import manager as test_module
from ..manager import RevocationManager, RevocationManagerError
from ..models.issuer_rev_reg_record import IssuerRevRegRecord

TEST_DID = "LjgpST2rjsoxYegQDRm7EL"
SCHEMA_NAME = "bc-reg"
Expand Down Expand Up @@ -889,3 +897,94 @@ async def test_set_revoked_state_v2(self):
session, crev_record.record_id
)
assert check_crev_record.state == IssuerCredRevRecord.STATE_REVOKED

@mock.patch.object(
ConnRecord,
"retrieve_by_id",
mock.CoroutineMock(
return_value=mock.MagicMock(
connection_id="endorser-id",
metadata_get=mock.CoroutineMock(
return_value={"endorser_did": "test_endorser_did"}
),
)
),
)
@mock.patch.object(
IssuerRevRegRecord,
"fix_ledger_entry",
mock.CoroutineMock(
return_value=(
"1 ...",
{
"ver": "1.0",
"value": {
"prevAccum": "1 ...",
"accum": "fixed-accum",
"issued": [1],
},
},
[1],
)
),
)
async def test_fix_and_publish_from_invalid_accum_err(
self,
):
# Setup
self.profile.context.injector.bind_instance(BaseCache, InMemoryCache())
self.profile.context.injector.bind_instance(
BaseResponder, mock.MagicMock(BaseResponder, autospec=True)
)
mock_ledger = mock.MagicMock(BaseLedger, autospec=True)
mock_ledger.get_revoc_reg_delta = mock.CoroutineMock(
side_effect=[
({"value": {"accum": "other-accum"}}, None),
({"value": {"accum": "invalid-accum"}}, None),
]
)
mock_ledger.send_revoc_reg_entry = mock.CoroutineMock(
return_value={"result": {"txn": "..."}}
)
self.profile.context.injector.bind_instance(BaseLedger, mock_ledger)
self.profile.context.settings.set_value(
"ledger.genesis_transactions", {"txn": "..."}
)
self.profile.context.settings.set_value("endorser.endorser_alias", "endorser")

async with self.profile.session() as session:
# Add an endorser connection
await session.handle.insert(
ConnRecord.RECORD_TYPE,
name="endorser",
value_json={"connection_id": "endorser", "alias": "endorser"},
)
record = ConnRecord(
alias="endorser",
)
await record.save(session)

# Add a couple rev reg records
for _ in range(2):
await session.handle.insert(
IssuerRevRegRecord.RECORD_TYPE,
name=str(uuid4()),
value_json={
"revoc_reg_id": "test-rr-id",
},
)

# Need a matching revocation_reg record
await session.handle.insert(
CATEGORY_REV_REG,
name="test-rr-id",
value_json={
"value": {
"accum": "invalid-accum",
"revoked": [1],
}
},
)

# Execute
await self.manager.fix_and_publish_from_invalid_accum_err("invalid-accum")
9 changes: 9 additions & 0 deletions acapy_agent/revocation/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,12 @@ async def notify_pending_cleared_event(
"""Send notification of credential revoked as issuer."""
topic = f"{REVOCATION_EVENT_PREFIX}{REVOCATION_CLEAR_PENDING_EVENT}::{rev_reg_id}"
await profile.notify(topic, {"rev_reg_id": rev_reg_id})


async def notify_rev_reg_entry_txn_failed(
profile: Profile,
msg: str,
):
"""Send notification that a revocation registry entry transaction failed."""
topic = f"{REVOCATION_EVENT_PREFIX}REV_REG_ENTRY_TXN_FAILED"
await profile.notify(topic, {"msg": msg})
Loading