Skip to content

Commit

Permalink
handle duplicate schema in send_schema by always fetching first
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Whitehead <[email protected]>
  • Loading branch information
andrewwhitehead committed Aug 7, 2019
1 parent 33321aa commit 0cfd3fa
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 45 deletions.
5 changes: 5 additions & 0 deletions aries_cloudagent/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ def __init__(self, *args, error_code: str = None, **kwargs):
if error_code:
self.error_code = error_code

@property
def message(self) -> str:
"""Accessor for the error message."""
return self.args and self.args[0]


class StartupError(BaseError):
"""Error raised when there is a problem starting the conductor."""
4 changes: 0 additions & 4 deletions aries_cloudagent/ledger/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,3 @@ class ClosedPoolError(LedgerError):

class LedgerTransactionError(LedgerError):
"""The ledger rejected the transaction."""


class DuplicateSchemaError(LedgerError):
"""The schema already exists on the ledger."""
89 changes: 66 additions & 23 deletions aries_cloudagent/ledger/indy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from .error import (
BadLedgerRequestError,
ClosedPoolError,
DuplicateSchemaError,
LedgerError,
LedgerTransactionError,
)
Expand Down Expand Up @@ -210,11 +209,6 @@ async def _submit(self, request_json: str, sign=True) -> str:

operation = request_result.get("op", "")

# HACK: If only there were a better way to identify this kind
# of rejected request...
if "can have one and only one SCHEMA with name" in request_result_json:
raise DuplicateSchemaError()

if operation in ("REQNACK", "REJECT"):
raise LedgerTransactionError(
f"Ledger rejected transaction request: {request_result['reason']}"
Expand All @@ -241,30 +235,74 @@ async def send_schema(
"""

public_did = await self.wallet.get_public_did()
if not public_did:
public_info = await self.wallet.get_public_did()
if not public_info:
raise BadLedgerRequestError("Cannot publish schema without a public DID")

with IndyErrorHandler("Exception when creating schema definition"):
schema_id, schema_json = await indy.anoncreds.issuer_create_schema(
public_did.did, schema_name, schema_version, json.dumps(attribute_names)
)
schema_id = await self.check_existing_schema(
public_info.did, schema_name, schema_version, attribute_names
)
if schema_id:
self.logger.warning("Schema already exists on ledger. Returning ID.")
else:
with IndyErrorHandler("Exception when creating schema definition"):
schema_id, schema_json = await indy.anoncreds.issuer_create_schema(
public_info.did,
schema_name,
schema_version,
json.dumps(attribute_names),
)

with IndyErrorHandler("Exception when building schema request"):
request_json = await indy.ledger.build_schema_request(
public_did.did, schema_json
)
with IndyErrorHandler("Exception when building schema request"):
request_json = await indy.ledger.build_schema_request(
public_info.did, schema_json
)

try:
await self._submit(request_json)
except DuplicateSchemaError as e:
self.logger.warning(
"Schema already exists on ledger. Returning ID. Error: %s", e
)
schema_id = f"{public_did.did}:{2}:{schema_name}:{schema_version}"
try:
await self._submit(request_json)
except LedgerTransactionError as e:
# Identify possible duplicate schema errors on indy-node < 1.9 and > 1.9
if (
"can have one and only one SCHEMA with name" in e.message
or "UnauthorizedClientRequest" in e.message
):
# handle potential race condition if multiple agents are publishing
# the same schema simultaneously
schema_id = await self.check_existing_schema(
public_info.did, schema_name, schema_version, attribute_names
)
if schema_id:
self.logger.warning(
"Schema already exists on ledger. Returning ID. Error: %s",
e,
)
else:
raise

return schema_id

async def check_existing_schema(
self,
public_did: str,
schema_name: str,
schema_version: str,
attribute_names: Sequence[str],
) -> str:
"""Check if a schema has already been published."""
fetch_schema_id = f"{public_did}:2:{schema_name}:{schema_version}"
schema = await self.fetch_schema(fetch_schema_id)
if schema:
fetched_attrs = schema["attrNames"].copy()
fetched_attrs.sort()
cmp_attrs = list(attribute_names)
cmp_attrs.sort()
if fetched_attrs != cmp_attrs:
raise LedgerTransactionError(
"Schema already exists on ledger, but attributes do not match: "
+ f"{schema_name}:{schema_version} {fetched_attrs} != {cmp_attrs}"
)
return fetch_schema_id

async def get_schema(self, schema_id: str):
"""
Get a schema from the cache if available, otherwise fetch from the ledger.
Expand Down Expand Up @@ -296,6 +334,11 @@ async def fetch_schema(self, schema_id: str):
)

response_json = await self._submit(request_json, sign=bool(public_did))
response = json.loads(response_json)
if not response["result"]["seqNo"]:
# schema not found
return None

with IndyErrorHandler("Exception when parsing schema response"):
_, parsed_schema_json = await indy.ledger.parse_get_schema_response(
response_json
Expand Down
25 changes: 15 additions & 10 deletions aries_cloudagent/ledger/tests/test_indy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
BadLedgerRequestError,
ClosedPoolError,
LedgerTransactionError,
DuplicateSchemaError,
)


@pytest.mark.indy
class TestIndyLedger(AsyncTestCase):
test_did = "55GkHamhTU1ZbTbV2ab9DE"

@async_mock.patch("builtins.open")
def test_init(self, mock_open):
mock_open.return_value = async_mock.MagicMock()
Expand Down Expand Up @@ -98,6 +99,7 @@ async def test_submit_signed(

mock_wallet.get_public_did = async_mock.CoroutineMock()
mock_did = mock_wallet.get_public_did.return_value
mock_did.did = self.test_did

await ledger._submit("{}", True)

Expand Down Expand Up @@ -186,12 +188,14 @@ async def test_submit_rejected(
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger._context_open")
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger._context_close")
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger._submit")
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger.fetch_schema")
@async_mock.patch("indy.anoncreds.issuer_create_schema")
@async_mock.patch("indy.ledger.build_schema_request")
async def test_send_schema(
self,
mock_build_schema_req,
mock_create_schema,
mock_fetch_schema,
mock_submit,
mock_close,
mock_open,
Expand All @@ -201,6 +205,7 @@ async def test_send_schema(
ledger = IndyLedger("name", mock_wallet, "genesis_transactions")

mock_create_schema.return_value = ("schema_id", "{}")
mock_fetch_schema.return_value = None

async with ledger:
mock_wallet.get_public_did = async_mock.CoroutineMock()
Expand All @@ -213,6 +218,7 @@ async def test_send_schema(

mock_wallet.get_public_did = async_mock.CoroutineMock()
mock_did = mock_wallet.get_public_did.return_value
mock_did.did = self.test_did

schema_id = await ledger.send_schema(
"schema_name", "schema_version", [1, 2, 3]
Expand All @@ -235,14 +241,14 @@ async def test_send_schema(
@async_mock.patch("indy.pool.create_pool_ledger_config")
@async_mock.patch("indy.pool.open_pool_ledger")
@async_mock.patch("indy.pool.close_pool_ledger")
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger._submit")
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger.check_existing_schema")
@async_mock.patch("indy.anoncreds.issuer_create_schema")
@async_mock.patch("indy.ledger.build_schema_request")
async def test_send_schema_already_exists(
self,
mock_build_schema_req,
mock_create_schema,
mock_submit,
mock_check_existing,
mock_close_pool,
mock_open_ledger,
mock_create_config,
Expand All @@ -256,40 +262,39 @@ async def test_send_schema_already_exists(

mock_create_schema.return_value = (1, 2)

mock_submit.side_effect = DuplicateSchemaError
fetch_schema_id = f"{mock_wallet.get_public_did.return_value.did}:{2}:schema_name:schema_version"
mock_check_existing.return_value = fetch_schema_id

ledger = IndyLedger("name", mock_wallet, "genesis_transactions")

async with ledger:
schema_id = await ledger.send_schema(
"schema_name", "schema_version", [1, 2, 3]
)
assert (
schema_id
== f"{mock_wallet.get_public_did.return_value.did}:{2}:schema_name:schema_version"
)
assert schema_id == fetch_schema_id

@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger._context_open")
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger._context_close")
@async_mock.patch("aries_cloudagent.ledger.indy.IndyLedger._submit")
@async_mock.patch("indy.anoncreds.issuer_create_schema")
@async_mock.patch("indy.ledger.build_get_schema_request")
@async_mock.patch("indy.ledger.parse_get_schema_response")
async def test_get_schema(
self,
mock_parse_get_schema_req,
mock_build_get_schema_req,
mock_create_schema,
mock_submit,
mock_close,
mock_open,
):
mock_wallet = async_mock.MagicMock()
mock_wallet.get_public_did = async_mock.CoroutineMock()
mock_did = mock_wallet.get_public_did.return_value
mock_did.did = self.test_did

mock_parse_get_schema_req.return_value = (None, "{}")

mock_submit.return_value = "{\"result\":{\"seqNo\":1}}"

ledger = IndyLedger("name", mock_wallet, "genesis_transactions")

async with ledger:
Expand Down
8 changes: 0 additions & 8 deletions aries_cloudagent/verifier/tests/test_indy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,6 @@

import pytest

from aries_cloudagent.ledger.indy import (
IndyLedger,
GENESIS_TRANSACTION_PATH,
ClosedPoolError,
LedgerTransactionError,
DuplicateSchemaError,
)

from aries_cloudagent.verifier.indy import IndyVerifier


Expand Down

0 comments on commit 0cfd3fa

Please sign in to comment.