-
Notifications
You must be signed in to change notification settings - Fork 516
/
base.py
607 lines (502 loc) · 19.7 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
"""Ledger base class."""
import json
import logging
import re
from abc import ABC, abstractmethod, ABCMeta
from enum import Enum
from hashlib import sha256
from typing import List, Sequence, Tuple, Union
from ..indy.issuer import DEFAULT_CRED_DEF_TAG, IndyIssuer, IndyIssuerError
from ..utils import sentinel
from ..wallet.did_info import DIDInfo
from .error import BadLedgerRequestError, LedgerError, LedgerTransactionError
from .endpoint_type import EndpointType
LOGGER = logging.getLogger(__name__)
class BaseLedger(ABC, metaclass=ABCMeta):
"""Base class for ledger."""
BACKEND_NAME: str = None
async def __aenter__(self) -> "BaseLedger":
"""
Context manager entry.
Returns:
The current instance
"""
return self
async def __aexit__(self, exc_type, exc, tb):
"""Context manager exit."""
@property
def backend(self) -> str:
"""Accessor for the ledger backend name."""
return self.__class__.BACKEND_NAME
@property
@abstractmethod
def read_only(self) -> bool:
"""Accessor for the ledger read-only flag."""
@abstractmethod
async def is_ledger_read_only(self) -> bool:
"""Check if ledger is read-only including TAA."""
@abstractmethod
async def get_key_for_did(self, did: str) -> str:
"""Fetch the verkey for a ledger DID.
Args:
did: The DID to look up on the ledger or in the cache
"""
@abstractmethod
async def get_endpoint_for_did(
self, did: str, endpoint_type: EndpointType = EndpointType.ENDPOINT
) -> str:
"""Fetch the endpoint for a ledger DID.
Args:
did: The DID to look up on the ledger or in the cache
endpoint_type: The type of the endpoint (default 'endpoint')
"""
@abstractmethod
async def get_all_endpoints_for_did(self, did: str) -> dict:
"""Fetch all endpoints for a ledger DID.
Args:
did: The DID to look up on the ledger or in the cache
"""
async def _construct_attr_json(
self,
endpoint: str,
endpoint_type: EndpointType = None,
all_exist_endpoints: dict = None,
routing_keys: List[str] = None,
) -> str:
"""Create attr_json string.
Args:
all_exist_endpoings: Dictionary of all existing endpoints
endpoint: The endpoint address
endpoint_type: The type of the endpoint
routing_keys: List of routing_keys if mediator is present
"""
if not routing_keys:
routing_keys = []
if all_exist_endpoints:
all_exist_endpoints[endpoint_type.indy] = endpoint
all_exist_endpoints["routingKeys"] = routing_keys
attr_json = json.dumps({"endpoint": all_exist_endpoints})
else:
endpoint_dict = {endpoint_type.indy: endpoint}
endpoint_dict["routingKeys"] = routing_keys
attr_json = json.dumps({"endpoint": endpoint_dict})
return attr_json
@abstractmethod
async def update_endpoint_for_did(
self,
did: str,
endpoint: str,
endpoint_type: EndpointType = EndpointType.ENDPOINT,
write_ledger: bool = True,
endorser_did: str = None,
routing_keys: List[str] = None,
) -> bool:
"""Check and update the endpoint on the ledger.
Args:
did: The ledger DID
endpoint: The endpoint address
endpoint_type: The type of the endpoint (default 'endpoint')
"""
@abstractmethod
async def register_nym(
self,
did: str,
verkey: str,
alias: str = None,
role: str = None,
write_ledger: bool = True,
endorser_did: str = None,
) -> Tuple[bool, dict]:
"""
Register a nym on the ledger.
Args:
did: DID to register on the ledger.
verkey: The verification key of the keypair.
alias: Human-friendly alias to assign to the DID.
role: For permissioned ledgers, what role should the new DID have.
"""
@abstractmethod
async def get_nym_role(self, did: str):
"""
Return the role registered to input public DID on the ledger.
Args:
did: DID to register on the ledger.
"""
@abstractmethod
def nym_to_did(self, nym: str) -> str:
"""Format a nym with the ledger's DID prefix."""
@abstractmethod
async def rotate_public_did_keypair(self, next_seed: str = None) -> None:
"""
Rotate keypair for public DID: create new key, submit to ledger, update wallet.
Args:
next_seed: seed for incoming ed25519 keypair (default random)
"""
def did_to_nym(self, did: str) -> str:
"""Remove the ledger's DID prefix to produce a nym."""
if did:
return re.sub(r"^did:\w+:", "", did)
@abstractmethod
async def get_wallet_public_did(self) -> DIDInfo:
"""Fetch the public DID from the wallet."""
@abstractmethod
async def get_txn_author_agreement(self, reload: bool = False):
"""Get the current transaction author agreement, fetching it if necessary."""
@abstractmethod
async def fetch_txn_author_agreement(self):
"""Fetch the current AML and TAA from the ledger."""
@abstractmethod
async def accept_txn_author_agreement(
self, taa_record: dict, mechanism: str, accept_time: int = None
):
"""Save a new record recording the acceptance of the TAA."""
@abstractmethod
async def get_latest_txn_author_acceptance(self):
"""Look up the latest TAA acceptance."""
def taa_digest(self, version: str, text: str):
"""Generate the digest of a TAA record."""
if not version or not text:
raise ValueError("Bad input for TAA digest")
taa_plaintext = version + text
return sha256(taa_plaintext.encode("utf-8")).digest().hex()
@abstractmethod
async def txn_endorse(
self,
request_json: str,
endorse_did: DIDInfo = None,
) -> str:
"""Endorse (sign) the provided transaction."""
@abstractmethod
async def txn_submit(
self,
request_json: str,
sign: bool,
taa_accept: bool = None,
sign_did: DIDInfo = sentinel,
write_ledger: bool = True,
) -> str:
"""Write the provided (signed and possibly endorsed) transaction to the ledger."""
@abstractmethod
async def fetch_schema_by_id(self, schema_id: str) -> dict:
"""
Get schema from ledger.
Args:
schema_id: The schema id (or stringified sequence number) to retrieve
Returns:
Indy schema dict
"""
@abstractmethod
async def fetch_schema_by_seq_no(self, seq_no: int) -> dict:
"""
Fetch a schema by its sequence number.
Args:
seq_no: schema ledger sequence number
Returns:
Indy schema dict
"""
async def check_existing_schema(
self,
public_did: str,
schema_name: str,
schema_version: str,
attribute_names: Sequence[str],
) -> Tuple[str, dict]:
"""Check if a schema has already been published."""
fetch_schema_id = f"{public_did}:2:{schema_name}:{schema_version}"
schema = await self.fetch_schema_by_id(fetch_schema_id)
if schema:
fetched_attrs = schema["attrNames"].copy()
fetched_attrs.sort()
cmp_attrs = list(attribute_names)
cmp_attrs.sort()
if fetched_attrs != cmp_attrs:
raise LedgerTransactionError(
"Schema already exists on ledger, but attributes do not match: "
+ f"{schema_name}:{schema_version} {fetched_attrs} != {cmp_attrs}"
)
return fetch_schema_id, schema
async def create_and_send_schema(
self,
issuer: IndyIssuer,
schema_name: str,
schema_version: str,
attribute_names: Sequence[str],
write_ledger: bool = True,
endorser_did: str = None,
) -> Tuple[str, dict]:
"""
Send schema to ledger.
Args:
issuer: The issuer instance to use for schema creation
schema_name: The schema name
schema_version: The schema version
attribute_names: A list of schema attributes
"""
public_info = await self.get_wallet_public_did()
if not public_info:
raise BadLedgerRequestError("Cannot publish schema without a public DID")
schema_info = await self.check_existing_schema(
public_info.did, schema_name, schema_version, attribute_names
)
if schema_info:
LOGGER.warning("Schema already exists on ledger. Returning details.")
schema_id, schema_def = schema_info
else:
if await self.is_ledger_read_only():
raise LedgerError(
"Error cannot write schema when ledger is in read only mode"
)
try:
schema_id, schema_json = await issuer.create_schema(
public_info.did,
schema_name,
schema_version,
attribute_names,
)
except IndyIssuerError as err:
raise LedgerError(err.message) from err
schema_def = json.loads(schema_json)
schema_req = await self._create_schema_request(
public_info,
schema_json,
write_ledger=write_ledger,
endorser_did=endorser_did,
)
try:
resp = await self.txn_submit(
schema_req,
sign=True,
sign_did=public_info,
write_ledger=write_ledger,
)
if not write_ledger:
return schema_id, {"signed_txn": resp}
try:
# parse sequence number out of response
seq_no = json.loads(resp)["result"]["txnMetadata"]["seqNo"]
schema_def["seqNo"] = seq_no
except KeyError as err:
raise LedgerError(
"Failed to parse schema sequence number from ledger response"
) from err
except LedgerTransactionError as e:
# Identify possible duplicate schema errors on indy-node < 1.9 and > 1.9
if (
"can have one and only one SCHEMA with name" in e.message
or "UnauthorizedClientRequest" in e.message
):
# handle potential race condition if multiple agents are publishing
# the same schema simultaneously
schema_info = await self.check_existing_schema(
public_info.did, schema_name, schema_version, attribute_names
)
if schema_info:
LOGGER.warning(
"Schema already exists on ledger. Returning details."
" Error: %s",
e,
)
schema_id, schema_def = schema_info
else:
raise
return schema_id, schema_def
@abstractmethod
async def _create_schema_request(
self,
public_info: DIDInfo,
schema_json: str,
write_ledger: bool = True,
endorser_did: str = None,
):
"""Create the ledger request for publishing a schema."""
@abstractmethod
async def get_revoc_reg_def(self, revoc_reg_id: str) -> dict:
"""Look up a revocation registry definition by ID."""
@abstractmethod
async def send_revoc_reg_def(
self,
revoc_reg_def: dict,
issuer_did: str = None,
write_ledger: bool = True,
endorser_did: str = None,
) -> dict:
"""Publish a revocation registry definition to the ledger."""
@abstractmethod
async def send_revoc_reg_entry(
self,
revoc_reg_id: str,
revoc_def_type: str,
revoc_reg_entry: dict,
issuer_did: str = None,
write_ledger: bool = True,
endorser_did: str = None,
) -> dict:
"""Publish a revocation registry entry to the ledger."""
async def create_and_send_credential_definition(
self,
issuer: IndyIssuer,
schema_id: str,
signature_type: str = None,
tag: str = None,
support_revocation: bool = False,
write_ledger: bool = True,
endorser_did: str = None,
) -> Tuple[str, dict, bool]:
"""
Send credential definition to ledger and store relevant key matter in wallet.
Args:
issuer: The issuer instance to use for credential definition creation
schema_id: The schema id of the schema to create cred def for
signature_type: The signature type to use on the credential definition
tag: Optional tag to distinguish multiple credential definitions
support_revocation: Optional flag to enable revocation for this cred def
Returns:
Tuple with cred def id, cred def structure, and whether it's novel
"""
public_info = await self.get_wallet_public_did()
if not public_info:
raise BadLedgerRequestError(
"Cannot publish credential definition without a public DID"
)
schema = await self.get_schema(schema_id)
if not schema:
raise LedgerError(f"Ledger {self.pool_name} has no schema {schema_id}")
novel = False
# check if cred def is on ledger already
for test_tag in [tag] if tag else ["tag", DEFAULT_CRED_DEF_TAG]:
credential_definition_id = issuer.make_credential_definition_id(
public_info.did, schema, signature_type, test_tag
)
ledger_cred_def = await self.fetch_credential_definition(
credential_definition_id
)
if ledger_cred_def:
LOGGER.warning(
"Credential definition %s already exists on ledger %s",
credential_definition_id,
self.pool_name,
)
try:
if not await issuer.credential_definition_in_wallet(
credential_definition_id
):
raise LedgerError(
f"Credential definition {credential_definition_id} is on "
f"ledger {self.pool_name} but not in wallet "
f"{self.profile.name}"
)
except IndyIssuerError as err:
raise LedgerError(err.message) from err
credential_definition_json = json.dumps(ledger_cred_def)
break
else: # no such cred def on ledger
try:
if await issuer.credential_definition_in_wallet(
credential_definition_id
):
raise LedgerError(
f"Credential definition {credential_definition_id} is in "
f"wallet {self.profile.name} but not on ledger "
f"{self.pool.name}"
)
except IndyIssuerError as err:
raise LedgerError(err.message) from err
# Cred def is neither on ledger nor in wallet: create and send it
novel = True
try:
(
credential_definition_id,
credential_definition_json,
) = await issuer.create_and_store_credential_definition(
public_info.did,
schema,
signature_type,
tag,
support_revocation,
)
except IndyIssuerError as err:
raise LedgerError(err.message) from err
if await self.is_ledger_read_only():
raise LedgerError(
"Error cannot write cred def when ledger is in read only mode"
)
cred_def_req = await self._create_credential_definition_request(
public_info,
credential_definition_json,
write_ledger=write_ledger,
endorser_did=endorser_did,
)
resp = await self.txn_submit(
cred_def_req, True, sign_did=public_info, write_ledger=write_ledger
)
if not write_ledger:
return (credential_definition_id, {"signed_txn": resp}, novel)
return (credential_definition_id, json.loads(credential_definition_json), novel)
@abstractmethod
async def _create_credential_definition_request(
self,
public_info: DIDInfo,
credential_definition_json: str,
write_ledger: bool = True,
endorser_did: str = None,
):
"""Create the ledger request for publishing a credential definition."""
@abstractmethod
async def get_credential_definition(self, credential_definition_id: str) -> dict:
"""
Get a credential definition from the cache if available, otherwise the ledger.
Args:
credential_definition_id: The schema id of the schema to fetch cred def for
"""
@abstractmethod
async def get_revoc_reg_delta(
self, revoc_reg_id: str, timestamp_from=0, timestamp_to=None
) -> Tuple[dict, int]:
"""Look up a revocation registry delta by ID."""
@abstractmethod
async def get_schema(self, schema_id: str) -> dict:
"""
Get a schema from the cache if available, otherwise fetch from the ledger.
Args:
schema_id: The schema id (or stringified sequence number) to retrieve
"""
@abstractmethod
async def get_revoc_reg_entry(
self, revoc_reg_id: str, timestamp: int
) -> Tuple[dict, int]:
"""Get revocation registry entry by revocation registry ID and timestamp."""
class Role(Enum):
"""Enum for indy roles."""
STEWARD = (2,)
TRUSTEE = (0,)
ENDORSER = (101,)
NETWORK_MONITOR = (201,)
USER = (None, "") # in case reading from file, default empty "" or None for USER
ROLE_REMOVE = ("",) # but indy-sdk uses "" to identify a role in reset
@staticmethod
def get(token: Union[str, int] = None) -> "Role":
"""
Return enum instance corresponding to input token.
Args:
token: token identifying role to indy-sdk:
"STEWARD", "TRUSTEE", "ENDORSER", "" or None
"""
if token is None:
return Role.USER
for role in Role:
if role == Role.ROLE_REMOVE:
continue # not a sensible role to parse from any configuration
if isinstance(token, int) and token in role.value:
return role
if str(token).upper() == role.name or token in (str(v) for v in role.value):
return role
return None
def to_indy_num_str(self) -> str:
"""
Return (typically, numeric) string value that indy-sdk associates with role.
Recall that None signifies USER and "" signifies a role undergoing reset.
"""
return str(self.value[0]) if isinstance(self.value[0], int) else self.value[0]
def token(self) -> str:
"""Return token identifying role to indy-sdk."""
return self.value[0] if self in (Role.USER, Role.ROLE_REMOVE) else self.name