-
Notifications
You must be signed in to change notification settings - Fork 516
/
base_manager.py
436 lines (378 loc) · 15.8 KB
/
base_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
"""
Class to provide some common utilities.
For Connection, DIDExchange and OutOfBand Manager.
"""
import logging
from typing import List, Sequence, Tuple
from pydid import (
BaseDIDDocument as ResolvedDocument,
DIDCommService,
VerificationMethod,
)
import pydid
from pydid.verification_method import Ed25519VerificationKey2018
from ..core.error import BaseError
from ..core.profile import Profile
from ..did.did_key import DIDKey
from ..protocols.connections.v1_0.messages.connection_invitation import (
ConnectionInvitation,
)
from ..protocols.coordinate_mediation.v1_0.models.mediation_record import (
MediationRecord,
)
from ..protocols.coordinate_mediation.v1_0.route_manager import (
RouteManager,
)
from ..resolver.base import ResolverError
from ..resolver.did_resolver import DIDResolver
from ..storage.base import BaseStorage
from ..storage.error import StorageNotFoundError
from ..storage.record import StorageRecord
from ..wallet.base import BaseWallet
from ..wallet.did_info import DIDInfo
from .models.conn_record import ConnRecord
from .models.connection_target import ConnectionTarget
from .models.diddoc import DIDDoc, PublicKey, PublicKeyType, Service
class BaseConnectionManagerError(BaseError):
"""BaseConnectionManager error."""
class BaseConnectionManager:
"""Class to provide utilities regarding connection_targets."""
RECORD_TYPE_DID_DOC = "did_doc"
RECORD_TYPE_DID_KEY = "did_key"
SUPPORTED_KEY_TYPES = (Ed25519VerificationKey2018,)
def __init__(self, profile: Profile):
"""
Initialize a BaseConnectionManager.
Args:
session: The profile session for this presentation
"""
self._logger = logging.getLogger(__name__)
self._profile = profile
self._route_manager = profile.inject(RouteManager)
async def create_did_document(
self,
did_info: DIDInfo,
inbound_connection_id: str = None,
svc_endpoints: Sequence[str] = None,
mediation_records: List[MediationRecord] = None,
) -> DIDDoc:
"""Create our DID doc for a given DID.
Args:
did_info: The DID information (DID and verkey) used in the connection
inbound_connection_id: The ID of the inbound routing connection to use
svc_endpoints: Custom endpoints for the DID Document
mediation_record: The record for mediation that contains routing_keys and
service endpoint
Returns:
The prepared `DIDDoc` instance
"""
did_doc = DIDDoc(did=did_info.did)
did_controller = did_info.did
did_key = did_info.verkey
pk = PublicKey(
did_info.did,
"1",
did_key,
PublicKeyType.ED25519_SIG_2018,
did_controller,
True,
)
did_doc.set(pk)
router_id = inbound_connection_id
routing_keys = []
router_idx = 1
while router_id:
# look up routing connection information
async with self._profile.session() as session:
router = await ConnRecord.retrieve_by_id(session, router_id)
if ConnRecord.State.get(router.state) != ConnRecord.State.COMPLETED:
raise BaseConnectionManagerError(
f"Router connection not completed: {router_id}"
)
routing_doc, _ = await self.fetch_did_document(router.their_did)
if not routing_doc.service:
raise BaseConnectionManagerError(
f"No services defined by routing DIDDoc: {router_id}"
)
for service in routing_doc.service.values():
if not service.endpoint:
raise BaseConnectionManagerError(
"Routing DIDDoc service has no service endpoint"
)
if not service.recip_keys:
raise BaseConnectionManagerError(
"Routing DIDDoc service has no recipient key(s)"
)
rk = PublicKey(
did_info.did,
f"routing-{router_idx}",
service.recip_keys[0].value,
PublicKeyType.ED25519_SIG_2018,
did_controller,
True,
)
routing_keys.append(rk)
svc_endpoints = [service.endpoint]
break
router_id = router.inbound_connection_id
if mediation_records:
for mediation_record in mediation_records:
mediator_routing_keys = [
PublicKey(
did_info.did,
f"routing-{idx}",
key,
PublicKeyType.ED25519_SIG_2018,
did_controller, # TODO: get correct controller did_info
True, # TODO: should this be true?
)
for idx, key in enumerate(mediation_record.routing_keys)
]
routing_keys = [*routing_keys, *mediator_routing_keys]
svc_endpoints = [mediation_record.endpoint]
for (endpoint_index, svc_endpoint) in enumerate(svc_endpoints or []):
endpoint_ident = "indy" if endpoint_index == 0 else f"indy{endpoint_index}"
service = Service(
did_info.did,
endpoint_ident,
"IndyAgent",
[pk],
routing_keys,
svc_endpoint,
)
did_doc.set(service)
return did_doc
async def store_did_document(self, did_doc: DIDDoc):
"""Store a DID document.
Args:
did_doc: The `DIDDoc` instance to persist
"""
assert did_doc.did
try:
stored_doc, record = await self.fetch_did_document(did_doc.did)
except StorageNotFoundError:
record = StorageRecord(
self.RECORD_TYPE_DID_DOC,
did_doc.to_json(),
{"did": did_doc.did},
)
async with self._profile.session() as session:
storage: BaseStorage = session.inject(BaseStorage)
await storage.add_record(record)
else:
async with self._profile.session() as session:
storage: BaseStorage = session.inject(BaseStorage)
await storage.update_record(
record, did_doc.to_json(), {"did": did_doc.did}
)
await self.remove_keys_for_did(did_doc.did)
for key in did_doc.pubkey.values():
if key.controller == did_doc.did:
await self.add_key_for_did(did_doc.did, key.value)
async def add_key_for_did(self, did: str, key: str):
"""Store a verkey for lookup against a DID.
Args:
did: The DID to associate with this key
key: The verkey to be added
"""
record = StorageRecord(self.RECORD_TYPE_DID_KEY, key, {"did": did, "key": key})
async with self._profile.session() as session:
storage: BaseStorage = session.inject(BaseStorage)
await storage.add_record(record)
async def find_did_for_key(self, key: str) -> str:
"""Find the DID previously associated with a key.
Args:
key: The verkey to look up
"""
async with self._profile.session() as session:
storage: BaseStorage = session.inject(BaseStorage)
record = await storage.find_record(self.RECORD_TYPE_DID_KEY, {"key": key})
return record.tags["did"]
async def remove_keys_for_did(self, did: str):
"""Remove all keys associated with a DID.
Args:
did: The DID for which to remove keys
"""
async with self._profile.session() as session:
storage: BaseStorage = session.inject(BaseStorage)
await storage.delete_all_records(self.RECORD_TYPE_DID_KEY, {"did": did})
async def resolve_invitation(self, did: str):
"""
Resolve invitation with the DID Resolver.
Args:
did: Document ID to resolve
"""
if not did.startswith("did:"):
# DID is bare indy "nym"
# prefix with did:sov: for backwards compatibility
did = f"did:sov:{did}"
resolver = self._profile.inject(DIDResolver)
try:
doc_dict: dict = await resolver.resolve(self._profile, did)
doc: ResolvedDocument = pydid.deserialize_document(doc_dict, strict=True)
except ResolverError as error:
raise BaseConnectionManagerError(
"Failed to resolve public DID in invitation"
) from error
if not doc.service:
raise BaseConnectionManagerError(
"Cannot connect via public DID that has no associated services"
)
didcomm_services = sorted(
[service for service in doc.service if isinstance(service, DIDCommService)],
key=lambda service: service.priority,
)
if not didcomm_services:
raise BaseConnectionManagerError(
"Cannot connect via public DID that has no associated DIDComm services"
)
first_didcomm_service, *_ = didcomm_services
endpoint = first_didcomm_service.service_endpoint
recipient_keys: List[VerificationMethod] = [
doc.dereference(url) for url in first_didcomm_service.recipient_keys
]
routing_keys: List[VerificationMethod] = [
doc.dereference(url) for url in first_didcomm_service.routing_keys
]
for key in [*recipient_keys, *routing_keys]:
if not isinstance(key, self.SUPPORTED_KEY_TYPES):
raise BaseConnectionManagerError(
f"Key type {key.type} is not supported"
)
return (
endpoint,
[key.material for key in recipient_keys],
[key.material for key in routing_keys],
)
async def fetch_connection_targets(
self, connection: ConnRecord
) -> Sequence[ConnectionTarget]:
"""Get a list of connection targets from a `ConnRecord`.
Args:
connection: The connection record (with associated `DIDDoc`)
used to generate the connection target
"""
if not connection.my_did:
self._logger.debug("No local DID associated with connection")
return None
results = None
if (
ConnRecord.State.get(connection.state)
in (ConnRecord.State.INVITATION, ConnRecord.State.REQUEST)
and ConnRecord.Role.get(connection.their_role) is ConnRecord.Role.RESPONDER
):
if (
connection.invitation_msg_id
or connection.invitation_key
or not connection.their_did
):
async with self._profile.session() as session:
invitation = await connection.retrieve_invitation(session)
if isinstance(
invitation, ConnectionInvitation
): # conn protocol invitation
if invitation.did:
did = invitation.did
(
endpoint,
recipient_keys,
routing_keys,
) = await self.resolve_invitation(did)
else:
endpoint = invitation.endpoint
recipient_keys = invitation.recipient_keys
routing_keys = invitation.routing_keys
else: # out-of-band invitation
oob_service_item = invitation.services[0]
if isinstance(oob_service_item, str):
(
endpoint,
recipient_keys,
routing_keys,
) = await self.resolve_invitation(oob_service_item)
else:
endpoint = oob_service_item.service_endpoint
recipient_keys = [
DIDKey.from_did(k).public_key_b58
for k in oob_service_item.recipient_keys
]
routing_keys = [
DIDKey.from_did(k).public_key_b58
for k in oob_service_item.routing_keys
]
else:
if connection.their_did:
invitation = None
did = connection.their_did
(
endpoint,
recipient_keys,
routing_keys,
) = await self.resolve_invitation(did)
async with self._profile.session() as session:
wallet = session.inject(BaseWallet)
my_info = await wallet.get_local_did(connection.my_did)
results = [
ConnectionTarget(
did=connection.their_did,
endpoint=endpoint,
label=invitation.label if invitation else None,
recipient_keys=recipient_keys,
routing_keys=routing_keys,
sender_key=my_info.verkey,
)
]
else:
if not connection.their_did:
self._logger.debug("No target DID associated with connection")
return None
did_doc, _ = await self.fetch_did_document(connection.their_did)
async with self._profile.session() as session:
wallet = session.inject(BaseWallet)
my_info = await wallet.get_local_did(connection.my_did)
results = self.diddoc_connection_targets(
did_doc, my_info.verkey, connection.their_label
)
return results
def diddoc_connection_targets(
self, doc: DIDDoc, sender_verkey: str, their_label: str = None
) -> Sequence[ConnectionTarget]:
"""Get a list of connection targets from a DID Document.
Args:
doc: The DID Document to create the target from
sender_verkey: The verkey we are using
their_label: The connection label they are using
"""
if not doc:
raise BaseConnectionManagerError("No DIDDoc provided for connection target")
if not doc.did:
raise BaseConnectionManagerError("DIDDoc has no DID")
if not doc.service:
raise BaseConnectionManagerError("No services defined by DIDDoc")
targets = []
for service in doc.service.values():
if service.recip_keys:
targets.append(
ConnectionTarget(
did=doc.did,
endpoint=service.endpoint,
label=their_label,
recipient_keys=[
key.value for key in (service.recip_keys or ())
],
routing_keys=[
key.value for key in (service.routing_keys or ())
],
sender_key=sender_verkey,
)
)
return targets
async def fetch_did_document(self, did: str) -> Tuple[DIDDoc, StorageRecord]:
"""Retrieve a DID Document for a given DID.
Args:
did: The DID to search for
"""
async with self._profile.session() as session:
storage = session.inject(BaseStorage)
record = await storage.find_record(self.RECORD_TYPE_DID_DOC, {"did": did})
return DIDDoc.from_json(record.value), record