-
Notifications
You must be signed in to change notification settings - Fork 9
/
jsonld.py
137 lines (122 loc) · 4.94 KB
/
jsonld.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from aries_cloudcontroller import Doc, SignRequest, SignResponse, VerifyRequest
from fastapi import APIRouter, Depends
from app.dependencies.acapy_clients import client_from_auth
from app.dependencies.auth import AcaPyAuth, acapy_auth_from_header
from app.exceptions import CloudApiException, handle_acapy_call
from app.models.jsonld import JsonLdSignRequest, JsonLdVerifyRequest
from shared.log_config import get_logger
logger = get_logger(__name__)
router = APIRouter(prefix="/v1/jsonld", tags=["jsonld"])
@router.post("/sign", response_model=SignResponse)
async def sign_jsonld(
body: JsonLdSignRequest,
auth: AcaPyAuth = Depends(acapy_auth_from_header),
):
"""
Sign a JSON-LD structure
"""
bound_logger = logger.bind(
# Do not log credential data:
body=body.model_dump(exclude="credential")
)
bound_logger.debug("POST request received: Sign JsonLD")
async with client_from_auth(auth) as aries_controller:
if body.verkey:
verkey = body.verkey
else:
if body.pub_did:
pub_did = body.pub_did
else:
bound_logger.debug("Fetching public DID")
did_response = await handle_acapy_call(
logger=bound_logger,
acapy_call=aries_controller.wallet.get_public_did,
)
if not did_response.result or not did_response.result.did:
raise CloudApiException(
"Client requires a public did if verkey is not provided.", 400
)
pub_did = did_response.result.did
bound_logger.debug("Fetching verkey for DID")
verkey_response = await handle_acapy_call(
logger=bound_logger,
acapy_call=aries_controller.ledger.get_did_verkey,
did=pub_did,
)
verkey = verkey_response.verkey
if not verkey:
raise CloudApiException(
"verkey was not provided and could not be obtained from the pub_did.",
500,
)
if not body.credential:
if body.credential_id:
# Can this ever be correct as in are there jsonLD credential potentially being returned?
bound_logger.debug("Fetching credential from wallet")
indy_cred_info = await handle_acapy_call(
logger=bound_logger,
acapy_call=aries_controller.credentials.get_record,
credential_id=body.credential_id,
)
credential = indy_cred_info.to_dict()
else:
# This is already handled in JsonLdSignRequest model validation
raise CloudApiException(
"Neither a credential nor a credential ID is provided.", 400
)
else:
credential = body.credential
bound_logger.debug("Signing JsonLD")
request_body = SignRequest(
doc=Doc(credential=credential, options=body.signature_options),
verkey=verkey,
)
result = await handle_acapy_call(
logger=bound_logger,
acapy_call=aries_controller.jsonld.sign,
body=request_body,
)
if result:
bound_logger.debug("Successfully signed JsonLD.")
else:
bound_logger.warning("No result from signing JsonLD.")
return result
@router.post("/verify", status_code=204)
async def verify_jsonld(
body: JsonLdVerifyRequest,
auth: AcaPyAuth = Depends(acapy_auth_from_header),
) -> None:
"""
Verify a JSON-LD structure
"""
bound_logger = logger.bind(body=body)
bound_logger.debug("POST request received: Verify JsonLD")
async with client_from_auth(auth) as aries_controller:
if not body.verkey:
bound_logger.debug("Fetching verkey for DID")
verkey_response = await handle_acapy_call(
logger=bound_logger,
acapy_call=aries_controller.ledger.get_did_verkey,
did=body.public_did,
)
verkey = verkey_response.verkey
if not verkey:
raise CloudApiException(
"verkey was not provided and could not be obtained from the pub_did.",
500,
)
else:
verkey = body.verkey
bound_logger.debug("Verifying JsonLD")
request_body = VerifyRequest(doc=body.doc, verkey=verkey)
jsonld_verify_response = await handle_acapy_call(
logger=bound_logger,
acapy_call=aries_controller.jsonld.verify,
body=request_body,
)
if not jsonld_verify_response.valid:
raise CloudApiException(
f"Failed to verify payload with error message: `{jsonld_verify_response.error}`.",
422,
)
bound_logger.debug("Successfully verified JsonLD.")