Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust revocation registry update procedure to shorten transactions #1804

Merged
merged 2 commits into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 104 additions & 78 deletions aries_cloudagent/indy/credx/issuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
)

from ...askar.profile import AskarProfile
from ...core.profile import ProfileSession

from ..issuer import (
IndyIssuer,
Expand Down Expand Up @@ -384,7 +383,6 @@ async def revoke_credentials(
revoc_reg_id: str,
tails_file_path: str,
cred_revoc_ids: Sequence[str],
transaction: ProfileSession = None,
) -> Tuple[str, Sequence[str]]:
"""
Revoke a set of credentials in a revocation registry.
Expand All @@ -399,66 +397,84 @@ async def revoke_credentials(

"""

txn = transaction if transaction else await self._profile.transaction()
try:
rev_reg_def = await txn.handle.fetch(CATEGORY_REV_REG_DEF, revoc_reg_id)
rev_reg = await txn.handle.fetch(
CATEGORY_REV_REG, revoc_reg_id, for_update=True
)
rev_reg_info = await txn.handle.fetch(
CATEGORY_REV_REG_INFO, revoc_reg_id, for_update=True
)
if not rev_reg_def:
raise IndyIssuerError("Revocation registry definition not found")
if not rev_reg:
raise IndyIssuerError("Revocation registry not found")
if not rev_reg_info:
raise IndyIssuerError("Revocation registry metadata not found")
except AskarError as err:
raise IndyIssuerError("Error retrieving revocation registry") from err
delta = None
failed_crids = set()
max_attempt = 5
attempt = 0

try:
rev_reg_def = RevocationRegistryDefinition.load(rev_reg_def.raw_value)
except CredxError as err:
raise IndyIssuerError(
"Error loading revocation registry definition"
) from err

rev_crids = []
failed_crids = []
max_cred_num = rev_reg_def.max_cred_num
rev_info = rev_reg_info.value_json
used_ids = set(rev_info.get("used_ids") or [])

for rev_id in cred_revoc_ids:
rev_id = int(rev_id)
if rev_id < 1 or rev_id > max_cred_num:
LOGGER.error(
"Skipping requested credential revocation"
"on rev reg id %s, cred rev id=%s not in range",
revoc_reg_id,
rev_id,
)
elif rev_id > rev_info["curr_id"]:
LOGGER.warn(
"Skipping requested credential revocation"
"on rev reg id %s, cred rev id=%s not yet issued",
revoc_reg_id,
rev_id,
)
elif rev_id in used_ids:
LOGGER.warn(
"Skipping requested credential revocation"
"on rev reg id %s, cred rev id=%s already revoked",
revoc_reg_id,
rev_id,
)
else:
rev_crids.append(rev_id)
while True:
attempt += 1
if attempt >= max_attempt:
raise IndyIssuerError("Repeated conflict attempting to update registry")
try:
async with self._profile.session() as session:
rev_reg_def = await session.handle.fetch(
CATEGORY_REV_REG_DEF, revoc_reg_id
)
rev_reg = await session.handle.fetch(CATEGORY_REV_REG, revoc_reg_id)
rev_reg_info = await session.handle.fetch(
CATEGORY_REV_REG_INFO, revoc_reg_id
)
if not rev_reg_def:
raise IndyIssuerError("Revocation registry definition not found")
if not rev_reg:
raise IndyIssuerError("Revocation registry not found")
if not rev_reg_info:
raise IndyIssuerError("Revocation registry metadata not found")
except AskarError as err:
raise IndyIssuerError("Error retrieving revocation registry") from err

try:
rev_reg_def = RevocationRegistryDefinition.load(rev_reg_def.raw_value)
except CredxError as err:
raise IndyIssuerError(
"Error loading revocation registry definition"
) from err

rev_crids = set()
failed_crids = set()
max_cred_num = rev_reg_def.max_cred_num
rev_info = rev_reg_info.value_json
used_ids = set(rev_info.get("used_ids") or [])

for rev_id in cred_revoc_ids:
rev_id = int(rev_id)
if rev_id < 1 or rev_id > max_cred_num:
LOGGER.error(
"Skipping requested credential revocation"
"on rev reg id %s, cred rev id=%s not in range",
revoc_reg_id,
rev_id,
)
failed_crids.add(rev_id)
elif rev_id > rev_info["curr_id"]:
LOGGER.warn(
"Skipping requested credential revocation"
"on rev reg id %s, cred rev id=%s not yet issued",
revoc_reg_id,
rev_id,
)
failed_crids.add(rev_id)
elif rev_id in used_ids:
LOGGER.warn(
"Skipping requested credential revocation"
"on rev reg id %s, cred rev id=%s already revoked",
revoc_reg_id,
rev_id,
)
failed_crids.add(rev_id)
else:
rev_crids.add(rev_id)

if not rev_crids:
break

if rev_crids:
try:
rev_reg = RevocationRegistry.load(rev_reg.raw_value)
except CredxError as err:
raise IndyIssuerError("Error loading revocation registry") from err

try:
delta = await asyncio.get_event_loop().run_in_executor(
None,
lambda: rev_reg.update(
Expand All @@ -472,31 +488,41 @@ async def revoke_credentials(
raise IndyIssuerError("Error updating revocation registry") from err

try:
await txn.handle.replace(
CATEGORY_REV_REG, revoc_reg_id, rev_reg.to_json_buffer()
)
used_ids.update(rev_crids)
rev_info["used_ids"] = list(used_ids)
await txn.handle.replace(
CATEGORY_REV_REG_INFO, revoc_reg_id, value_json=rev_info
)
for cred_rev_id in rev_crids:
issuer_cr_rec = await IssuerCredRevRecord.retrieve_by_ids(
txn,
revoc_reg_id,
str(cred_rev_id),
async with self._profile.transaction() as txn:
rev_reg_upd = await txn.handle.fetch(
CATEGORY_REV_REG, revoc_reg_id, for_update=True
)
rev_info_upd = await txn.handle.fetch(
CATEGORY_REV_REG_INFO, revoc_reg_id, for_update=True
)
await issuer_cr_rec.set_state(
txn, IssuerCredRevRecord.STATE_REVOKED
if not rev_reg_upd or not rev_reg_info:
LOGGER.warn(
"Revocation registry missing, skipping update: {}",
revoc_reg_id,
)
delta = None
break
rev_info_upd = rev_info_upd.value_json
if rev_info_upd != rev_info:
# handle concurrent update to the registry by retrying
continue
await txn.handle.replace(
CATEGORY_REV_REG, revoc_reg_id, rev_reg.to_json_buffer()
)
used_ids.update(rev_crids)
rev_info_upd["used_ids"] = sorted(used_ids)
await txn.handle.replace(
CATEGORY_REV_REG_INFO, revoc_reg_id, value_json=rev_info_upd
)
if not transaction:
await txn.commit()
except AskarError as err:
raise IndyIssuerError("Error saving revocation registry") from err
else:
delta = None
break

return (delta and delta.to_json(), failed_crids)
return (
delta and delta.to_json(),
[str(rev_id) for rev_id in sorted(failed_crids)],
)

async def merge_revocation_registry_deltas(
self, fro_delta: str, to_delta: str
Expand Down Expand Up @@ -567,7 +593,7 @@ async def create_and_store_revocation_registry(
rev_reg_def,
rev_reg_def_private,
rev_reg,
rev_reg_delta,
_rev_reg_delta,
) = await asyncio.get_event_loop().run_in_executor(
None,
lambda: RevocationRegistryDefinition.create(
Expand Down
2 changes: 0 additions & 2 deletions aries_cloudagent/indy/issuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from typing import Sequence, Tuple

from ..core.error import BaseError
from ..core.profile import ProfileSession


DEFAULT_CRED_DEF_TAG = "default"
Expand Down Expand Up @@ -150,7 +149,6 @@ async def revoke_credentials(
revoc_reg_id: str,
tails_file_path: str,
cred_rev_ids: Sequence[str],
transaction: ProfileSession = None,
) -> Tuple[str, Sequence[str]]:
"""
Revoke a set of credentials in a revocation registry.
Expand Down
18 changes: 3 additions & 15 deletions aries_cloudagent/indy/sdk/issuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import indy.blob_storage
from indy.error import AnoncredsRevocationRegistryFullError, IndyError, ErrorCode

from ...core.profile import ProfileSession
from ...indy.sdk.profile import IndySdkProfile
from ...messaging.util import encode
from ...revocation.models.issuer_cred_rev_record import IssuerCredRevRecord
Expand Down Expand Up @@ -267,7 +266,6 @@ async def revoke_credentials(
rev_reg_id: str,
tails_file_path: str,
cred_rev_ids: Sequence[str],
transaction: ProfileSession = None,
) -> Tuple[str, Sequence[str]]:
"""
Revoke a set of credentials in a revocation registry.
Expand All @@ -281,7 +279,7 @@ async def revoke_credentials(
Tuple with the combined revocation delta, list of cred rev ids not revoked

"""
failed_crids = []
failed_crids = set()
tails_reader_handle = await create_tails_reader(tails_file_path)

result_json = None
Expand All @@ -290,22 +288,12 @@ async def revoke_credentials(
"Exception when revoking credential", IndyIssuerError
):
try:
session = await self.profile.session()
delta_json = await indy.anoncreds.issuer_revoke_credential(
self.profile.wallet.handle,
tails_reader_handle,
rev_reg_id,
cred_rev_id,
)
issuer_cr_rec = await IssuerCredRevRecord.retrieve_by_ids(
session,
rev_reg_id,
cred_rev_id,
)
await issuer_cr_rec.set_state(
session, IssuerCredRevRecord.STATE_REVOKED
)

except IndyError as err:
if err.error_code == ErrorCode.AnoncredsInvalidUserRevocId:
LOGGER.error(
Expand All @@ -323,7 +311,7 @@ async def revoke_credentials(
err, "Revocation error", IndyIssuerError
).roll_up
)
failed_crids.append(cred_rev_id)
failed_crids.add(int(cred_rev_id))
continue
except StorageError as err:
LOGGER.warning(
Expand All @@ -344,7 +332,7 @@ async def revoke_credentials(
else:
result_json = delta_json

return (result_json, failed_crids)
return (result_json, [str(rev_id) for rev_id in sorted(failed_crids)])

async def merge_revocation_registry_deltas(
self, fro_delta: str, to_delta: str
Expand Down
Loading