Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle duplicate schema in send_schema by always fetching first #126

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions aries_cloudagent/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ def __init__(self, *args, error_code: str = None, **kwargs):
if error_code:
self.error_code = error_code

@property
def message(self) -> str:
"""Accessor for the error message."""
return self.args and self.args[0]


class StartupError(BaseError):
"""Error raised when there is a problem starting the conductor."""
4 changes: 0 additions & 4 deletions aries_cloudagent/ledger/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,3 @@ class ClosedPoolError(LedgerError):

class LedgerTransactionError(LedgerError):
"""The ledger rejected the transaction."""


class DuplicateSchemaError(LedgerError):
"""The schema already exists on the ledger."""
89 changes: 66 additions & 23 deletions aries_cloudagent/ledger/indy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from .error import (
BadLedgerRequestError,
ClosedPoolError,
DuplicateSchemaError,
LedgerError,
LedgerTransactionError,
)
Expand Down Expand Up @@ -210,11 +209,6 @@ async def _submit(self, request_json: str, sign=True) -> str:

operation = request_result.get("op", "")

# HACK: If only there were a better way to identify this kind
# of rejected request...
if "can have one and only one SCHEMA with name" in request_result_json:
raise DuplicateSchemaError()

if operation in ("REQNACK", "REJECT"):
raise LedgerTransactionError(
f"Ledger rejected transaction request: {request_result['reason']}"
Expand All @@ -241,30 +235,74 @@ async def send_schema(

"""

public_did = await self.wallet.get_public_did()
if not public_did:
public_info = await self.wallet.get_public_did()
if not public_info:
raise BadLedgerRequestError("Cannot publish schema without a public DID")

with IndyErrorHandler("Exception when creating schema definition"):
schema_id, schema_json = await indy.anoncreds.issuer_create_schema(
public_did.did, schema_name, schema_version, json.dumps(attribute_names)
)
schema_id = await self.check_existing_schema(
public_info.did, schema_name, schema_version, attribute_names
)
if schema_id:
self.logger.warning("Schema already exists on ledger. Returning ID.")
else:
with IndyErrorHandler("Exception when creating schema definition"):
schema_id, schema_json = await indy.anoncreds.issuer_create_schema(
public_info.did,
schema_name,
schema_version,
json.dumps(attribute_names),
)

with IndyErrorHandler("Exception when building schema request"):
request_json = await indy.ledger.build_schema_request(
public_did.did, schema_json
)
with IndyErrorHandler("Exception when building schema request"):
request_json = await indy.ledger.build_schema_request(
public_info.did, schema_json
)

try:
await self._submit(request_json)
except DuplicateSchemaError as e:
self.logger.warning(
"Schema already exists on ledger. Returning ID. Error: %s", e
)
schema_id = f"{public_did.did}:{2}:{schema_name}:{schema_version}"
try:
await self._submit(request_json)
except LedgerTransactionError as e:
# Identify possible duplicate schema errors on indy-node < 1.9 and > 1.9
if (
"can have one and only one SCHEMA with name" in e.message
or "UnauthorizedClientRequest" in e.message
):
# handle potential race condition if multiple agents are publishing
# the same schema simultaneously
schema_id = await self.check_existing_schema(
public_info.did, schema_name, schema_version, attribute_names
)
if schema_id:
self.logger.warning(
"Schema already exists on ledger. Returning ID. Error: %s",
e,
)
else:
raise

return schema_id

async def check_existing_schema(
self,
public_did: str,
schema_name: str,
schema_version: str,
attribute_names: Sequence[str],
) -> str:
"""Check if a schema has already been published."""
fetch_schema_id = f"{public_did}:2:{schema_name}:{schema_version}"
schema = await self.fetch_schema(fetch_schema_id)
if schema:
fetched_attrs = schema["attrNames"].copy()
fetched_attrs.sort()
cmp_attrs = list(attribute_names)
cmp_attrs.sort()
if fetched_attrs != cmp_attrs:
raise LedgerTransactionError(
"Schema already exists on ledger, but attributes do not match: "
+ f"{schema_name}:{schema_version} {fetched_attrs} != {cmp_attrs}"
)
return fetch_schema_id

async def get_schema(self, schema_id: str):
"""
Get a schema from the cache if available, otherwise fetch from the ledger.
Expand Down Expand Up @@ -296,6 +334,11 @@ async def fetch_schema(self, schema_id: str):
)

response_json = await self._submit(request_json, sign=bool(public_did))
response = json.loads(response_json)
if not response["result"]["seqNo"]:
# schema not found
return None

with IndyErrorHandler("Exception when parsing schema response"):
_, parsed_schema_json = await indy.ledger.parse_get_schema_response(
response_json
Expand Down
25 changes: 15 additions & 10 deletions aries_cloudagent/ledger/tests/test_indy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
BadLedgerRequestError,
ClosedPoolError,
LedgerTransactionError,
DuplicateSchemaError,
)


@pytest.mark.indy
class TestIndyLedger(AsyncTestCase):
test_did = "55GkHamhTU1ZbTbV2ab9DE"

@async_mock.patch("builtins.open")
def test_init(self, mock_open):
mock_open.return_value = async_mock.MagicMock()
Expand Down Expand Up @@ -98,6 +99,7 @@ async def test_submit_signed(

mock_wallet.get_public_did = async_mock.CoroutineMock()
mock_did = mock_wallet.get_public_did.return_value
mock_did.did = self.test_did

await ledger._submit("{}", True)

Expand Down Expand Up @@ -186,12 +188,14 @@ async def test_submit_rejected(
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger._context_open")
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger._context_close")
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger._submit")
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger.fetch_schema")
@async_mock.patch("indy.anoncreds.issuer_create_schema")
@async_mock.patch("indy.ledger.build_schema_request")
async def test_send_schema(
self,
mock_build_schema_req,
mock_create_schema,
mock_fetch_schema,
mock_submit,
mock_close,
mock_open,
Expand All @@ -201,6 +205,7 @@ async def test_send_schema(
ledger = IndyLedger("name", mock_wallet, "genesis_transactions")

mock_create_schema.return_value = ("schema_id", "{}")
mock_fetch_schema.return_value = None

async with ledger:
mock_wallet.get_public_did = async_mock.CoroutineMock()
Expand All @@ -213,6 +218,7 @@ async def test_send_schema(

mock_wallet.get_public_did = async_mock.CoroutineMock()
mock_did = mock_wallet.get_public_did.return_value
mock_did.did = self.test_did

schema_id = await ledger.send_schema(
"schema_name", "schema_version", [1, 2, 3]
Expand All @@ -235,14 +241,14 @@ async def test_send_schema(
@async_mock.patch("indy.pool.create_pool_ledger_config")
@async_mock.patch("indy.pool.open_pool_ledger")
@async_mock.patch("indy.pool.close_pool_ledger")
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger._submit")
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger.check_existing_schema")
@async_mock.patch("indy.anoncreds.issuer_create_schema")
@async_mock.patch("indy.ledger.build_schema_request")
async def test_send_schema_already_exists(
self,
mock_build_schema_req,
mock_create_schema,
mock_submit,
mock_check_existing,
mock_close_pool,
mock_open_ledger,
mock_create_config,
Expand All @@ -256,40 +262,39 @@ async def test_send_schema_already_exists(

mock_create_schema.return_value = (1, 2)

mock_submit.side_effect = DuplicateSchemaError
fetch_schema_id = f"{mock_wallet.get_public_did.return_value.did}:{2}:schema_name:schema_version"
mock_check_existing.return_value = fetch_schema_id

ledger = IndyLedger("name", mock_wallet, "genesis_transactions")

async with ledger:
schema_id = await ledger.send_schema(
"schema_name", "schema_version", [1, 2, 3]
)
assert (
schema_id
== f"{mock_wallet.get_public_did.return_value.did}:{2}:schema_name:schema_version"
)
assert schema_id == fetch_schema_id

@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger._context_open")
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger._context_close")
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger._submit")
@async_mock.patch("indy.anoncreds.issuer_create_schema")
@async_mock.patch("indy.ledger.build_get_schema_request")
@async_mock.patch("indy.ledger.parse_get_schema_response")
async def test_get_schema(
self,
mock_parse_get_schema_req,
mock_build_get_schema_req,
mock_create_schema,
mock_submit,
mock_close,
mock_open,
):
mock_wallet = async_mock.MagicMock()
mock_wallet.get_public_did = async_mock.CoroutineMock()
mock_did = mock_wallet.get_public_did.return_value
mock_did.did = self.test_did

mock_parse_get_schema_req.return_value = (None, "{}")

mock_submit.return_value = "{\"result\":{\"seqNo\":1}}"

ledger = IndyLedger("name", mock_wallet, "genesis_transactions")

async with ledger:
Expand Down
8 changes: 0 additions & 8 deletions aries_cloudagent/verifier/tests/test_indy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,6 @@

import pytest

from aries_cloudagent.ledger.indy import (
IndyLedger,
GENESIS_TRANSACTION_PATH,
ClosedPoolError,
LedgerTransactionError,
DuplicateSchemaError,
)

from aries_cloudagent.verifier.indy import IndyVerifier


Expand Down