forked from openwallet-foundation/acapy
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request openwallet-foundation#3261 from OpSecId/data-integ…
…rity-routes Data integrity routes
- Loading branch information
Showing
18 changed files
with
1,399 additions
and
36 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
from .eddsa_jcs_2022 import EddsaJcs2022 | ||
|
||
CRYPTOSUITES = { | ||
"eddsa-jcs-2022": EddsaJcs2022, | ||
} | ||
|
||
__all__ = [ | ||
"EddsaJcs2022", | ||
] |
204 changes: 204 additions & 0 deletions
204
aries_cloudagent/vc/data_integrity/cryptosuites/eddsa_jcs_2022.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,204 @@ | ||
"""EddsaJcs2022 cryptosuite.""" | ||
|
||
from hashlib import sha256 | ||
import canonicaljson | ||
|
||
from ....wallet.base import BaseWallet | ||
from ....wallet.keys.manager import ( | ||
MultikeyManager, | ||
multikey_to_verkey, | ||
key_type_from_multikey, | ||
) | ||
from ....utils.multiformats import multibase | ||
from ....core.profile import ProfileSession | ||
from ....core.error import BaseError | ||
from ..models.options import DataIntegrityProofOptions | ||
from ..models.proof import DataIntegrityProof | ||
from ..models.verification_response import ProblemDetails, DataIntegrityVerificationResult | ||
from ..errors import PROBLEM_DETAILS | ||
from datetime import datetime | ||
|
||
|
||
class CryptosuiteError(BaseError): | ||
"""Generic Cryptosuite Error.""" | ||
|
||
|
||
class EddsaJcs2022: | ||
"""EddsaJcs2022 cryptosuite. | ||
https://www.w3.org/TR/vc-di-eddsa/#eddsa-jcs-2022. | ||
""" | ||
|
||
def __init__(self, *, session: ProfileSession): | ||
"""Create new EddsaJcs2022 Cryptosuite instance. | ||
Args: | ||
session: ProfileSession to use during crypto operations. | ||
""" | ||
super().__init__() | ||
self.session = session | ||
self.wallet = session.inject(BaseWallet) | ||
self.key_manager = MultikeyManager(session) | ||
|
||
async def create_proof( | ||
self, unsecured_data_document: dict, options: DataIntegrityProofOptions | ||
): | ||
"""Create proof algorithm. | ||
https://www.w3.org/TR/vc-di-eddsa/#create-proof-eddsa-jcs-2022. | ||
""" | ||
proof = DataIntegrityProof.deserialize(options.serialize().copy()) | ||
|
||
# Spec says to copy document context to the proof but it's unecessary IMO, | ||
# commenting out for the time being... | ||
|
||
# if '@context' in unsecured_data_document: | ||
# proof['@context'] = unsecured_data_document['@context'] | ||
|
||
proof_config = self.proof_configuration(proof) | ||
transformed_data = self.transformation(unsecured_data_document, options) | ||
hash_data = self.hashing(transformed_data, proof_config) | ||
proof_bytes = await self.proof_serialization(hash_data, options) | ||
|
||
proof.proof_value = multibase.encode(proof_bytes, "base58btc") | ||
|
||
return proof | ||
|
||
def proof_configuration(self, options: DataIntegrityProofOptions): | ||
"""Proof configuration algorithm. | ||
https://www.w3.org/TR/vc-di-eddsa/#proof-configuration-eddsa-jcs-2022. | ||
""" | ||
proof_config = options | ||
assert ( | ||
proof_config.type == "DataIntegrityProof" | ||
), 'Expected proof.type to be "DataIntegrityProof' | ||
assert ( | ||
proof_config.cryptosuite == "eddsa-jcs-2022" | ||
), 'Expected proof.cryptosuite to be "eddsa-jcs-2022' | ||
|
||
if proof_config.created: | ||
assert datetime.fromisoformat(proof_config.created) | ||
|
||
if proof_config.expires: | ||
assert datetime.fromisoformat(proof_config.expires) | ||
|
||
return self._canonicalize(proof_config.serialize()) | ||
|
||
def transformation( | ||
self, unsecured_document: dict, options: DataIntegrityProofOptions | ||
): | ||
"""Transformation algorithm. | ||
https://www.w3.org/TR/vc-di-eddsa/#transformation-eddsa-jcs-2022. | ||
""" | ||
assert ( | ||
options.type == "DataIntegrityProof" | ||
), "Expected proof.type to be `DataIntegrityProof`" | ||
assert ( | ||
options.cryptosuite == "eddsa-jcs-2022" | ||
), "Expected proof.cryptosuite to be `eddsa-jcs-2022`" | ||
|
||
return self._canonicalize(unsecured_document) | ||
|
||
def hashing(self, transformed_document: bytes, canonical_proof_config: bytes): | ||
"""Hashing algorithm. | ||
https://www.w3.org/TR/vc-di-eddsa/#hashing-eddsa-jcs-2022. | ||
""" | ||
return ( | ||
sha256(canonical_proof_config).digest() | ||
+ sha256(transformed_document).digest() | ||
) | ||
|
||
async def proof_serialization( | ||
self, hash_data: bytes, options: DataIntegrityProofOptions | ||
): | ||
"""Proof Serialization Algorithm. | ||
https://www.w3.org/TR/vc-di-eddsa/#proof-serialization-eddsa-jcs-2022. | ||
""" | ||
# TODO encapsulate in a key manager method | ||
if options.verification_method.startswith("did:key:"): | ||
multikey = options.verification_method.split("#")[-1] | ||
key_info = await self.key_manager.from_multikey(multikey) | ||
|
||
else: | ||
key_info = await self.key_manager.from_kid(options.verification_method) | ||
|
||
return await self.wallet.sign_message( | ||
message=hash_data, | ||
from_verkey=multikey_to_verkey(key_info["multikey"]), | ||
) | ||
|
||
def _canonicalize(self, data: dict): | ||
"""Json canonicalization.""" | ||
return canonicaljson.encode_canonical_json(data) | ||
|
||
async def verify_proof(self, secured_document: dict): | ||
"""Verify proof algorithm. | ||
https://www.w3.org/TR/vc-di-eddsa/#verify-proof-eddsa-jcs-2022. | ||
""" | ||
unsecured_document = secured_document.copy() | ||
proof = unsecured_document.pop("proof") | ||
proof_options = proof.copy() | ||
proof_bytes = multibase.decode(proof_options.pop("proofValue")) | ||
|
||
try: | ||
# Currently leaving context processing out of scope, | ||
# leaving code commented as it's technically an algorithm step. | ||
# Due to the cryptosuite being based on JSON canonicalization, | ||
# the integrity of the document is protected without RDF processing. | ||
|
||
# https://www.w3.org/TR/vc-data-integrity/#validating-contexts | ||
|
||
# assert secured_document['@context'] == proof_options['@context'] | ||
# unsecured_document['@context'] = proof_options['@context'] | ||
|
||
proof_options = DataIntegrityProofOptions.deserialize(proof_options) | ||
transformed_data = self.transformation(unsecured_document, proof_options) | ||
proof_config = self.proof_configuration(proof_options) | ||
hash_data = self.hashing(transformed_data, proof_config) | ||
verified = await self.proof_verification( | ||
hash_data, proof_bytes, proof_options | ||
) | ||
if not verified: | ||
raise CryptosuiteError("Invalid signature.") | ||
|
||
except CryptosuiteError as err: | ||
problem_detail = ProblemDetails.deserialize( | ||
PROBLEM_DETAILS["PROOF_VERIFICATION_ERROR"] | ||
) | ||
problem_detail.detail = str(err) | ||
return DataIntegrityVerificationResult( | ||
verified=False, | ||
proof=DataIntegrityProof.deserialize(proof), | ||
problem_details=[problem_detail], | ||
) | ||
|
||
return DataIntegrityVerificationResult( | ||
verified=True, | ||
proof=DataIntegrityProof.deserialize(proof), | ||
problem_details=[], | ||
) | ||
|
||
async def proof_verification( | ||
self, hash_data: bytes, proof_bytes: bytes, options: DataIntegrityProofOptions | ||
): | ||
"""Proof verification algorithm. | ||
https://www.w3.org/TR/vc-di-eddsa/#proof-verification-eddsa-jcs-2022. | ||
""" | ||
multikey = await MultikeyManager( | ||
self.session | ||
).resolve_multikey_from_verification_method(options.verification_method) | ||
verkey = multikey_to_verkey(multikey) | ||
key_type = key_type_from_multikey(multikey) | ||
return await self.wallet.verify_message( | ||
message=hash_data, | ||
signature=proof_bytes, | ||
from_verkey=verkey, | ||
key_type=key_type, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
"""Problem Details objects for error handling.""" | ||
|
||
# https://www.w3.org/TR/vc-data-integrity/#processing-errors | ||
PROBLEM_DETAILS = { | ||
"PROOF_GENERATION_ERROR": { | ||
"type": "https://w3id.org/security#PROOF_GENERATION_ERROR" | ||
}, | ||
"PROOF_VERIFICATION_ERROR": { | ||
"type": "https://w3id.org/security#PROOF_VERIFICATION_ERROR" | ||
}, | ||
"PROOF_TRANSFORMATION_ERROR": { | ||
"type": "https://w3id.org/security#PROOF_TRANSFORMATION_ERROR" | ||
}, | ||
"INVALID_DOMAIN_ERROR": {"type": "https://w3id.org/security#INVALID_DOMAIN_ERROR"}, | ||
"INVALID_CHALLENGE_ERROR": { | ||
"type": "https://w3id.org/security#INVALID_CHALLENGE_ERROR" | ||
}, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
"""DataIntegrity class.""" | ||
|
||
from ...core.profile import ProfileSession | ||
from ...core.error import BaseError | ||
from ...resolver.base import DIDNotFound | ||
from .cryptosuites import EddsaJcs2022 | ||
from .models.proof import DataIntegrityProof | ||
from .models.options import DataIntegrityProofOptions | ||
from .models.verification_response import ( | ||
DataIntegrityVerificationResponse, | ||
DataIntegrityVerificationResult, | ||
ProblemDetails, | ||
) | ||
from .errors import PROBLEM_DETAILS | ||
|
||
from datetime import datetime | ||
|
||
CRYPTOSUITES = { | ||
"eddsa-jcs-2022": EddsaJcs2022, | ||
} | ||
|
||
PROOF_TYPES = ["DataIntegrityProof"] | ||
|
||
PROOF_PURPOSES = [ | ||
"authentication", | ||
"assertionMethod", | ||
] | ||
|
||
|
||
class DataIntegrityManagerError(BaseError): | ||
"""Generic DataIntegrityManager Error.""" | ||
|
||
|
||
class DataIntegrityManager: | ||
"""Class for managing data integrity proofs.""" | ||
|
||
def __init__(self, session: ProfileSession): | ||
"""Initialize the DataIntegrityManager.""" | ||
self.session = session | ||
|
||
async def add_proof(self, document: dict, options: DataIntegrityProofOptions): | ||
"""Data integrity add proof algorithm. | ||
https://www.w3.org/TR/vc-data-integrity/#add-proof. | ||
""" | ||
self.validate_proof_options(options) | ||
suite = self.select_suite(options) | ||
|
||
# Capture existing proofs if any | ||
all_proofs = document.pop("proof", []) | ||
if not isinstance(all_proofs, list) and not isinstance(all_proofs, dict): | ||
raise DataIntegrityManagerError("Expected proof to be a list or an object.") | ||
|
||
all_proofs = [all_proofs] if isinstance(all_proofs, dict) else all_proofs | ||
|
||
# Create secured document and create new proof | ||
secured_document = document.copy() | ||
secured_document["proof"] = all_proofs | ||
proof = await suite.create_proof(document, options) | ||
secured_document["proof"].append(proof.serialize()) | ||
return secured_document | ||
|
||
async def verify_proof(self, secured_document: dict): | ||
"""Verify a proof attached to a secured document. | ||
https://www.w3.org/TR/vc-data-integrity/#verify-proof. | ||
""" | ||
unsecured_document = secured_document.copy() | ||
all_proofs = unsecured_document.pop("proof") | ||
all_proofs = all_proofs if isinstance(all_proofs, list) else [all_proofs] | ||
verification_results = [] | ||
for proof in all_proofs: | ||
proof_options = proof.copy() | ||
proof_options.pop("proofValue") | ||
proof_options = DataIntegrityProofOptions.deserialize(proof_options) | ||
try: | ||
self.validate_proof_options(proof_options) | ||
suite = self.select_suite(proof_options) | ||
input_document = unsecured_document.copy() | ||
input_document["proof"] = proof | ||
verification_result = await suite.verify_proof(input_document) | ||
|
||
except (DataIntegrityManagerError, DIDNotFound) as err: | ||
problem_detail = ProblemDetails.deserialize( | ||
PROBLEM_DETAILS["PROOF_VERIFICATION_ERROR"] | ||
) | ||
problem_detail.detail = str(err) | ||
verification_result = DataIntegrityVerificationResult( | ||
verified=False, | ||
proof=DataIntegrityProof.deserialize(proof), | ||
problem_details=[problem_detail], | ||
) | ||
verification_results.append(verification_result) | ||
return DataIntegrityVerificationResponse( | ||
verified=( | ||
True if all(result.verified for result in verification_results) else False | ||
), | ||
verified_document=unsecured_document, | ||
results=verification_results, | ||
) | ||
|
||
def select_suite(self, options: DataIntegrityProofOptions): | ||
"""Instanciate a cryptographic suite. | ||
https://www.w3.org/TR/vc-data-integrity/#cryptographic-suites. | ||
""" | ||
if options.type == "DataIntegrityProof": | ||
suite = CRYPTOSUITES[options.cryptosuite](session=self.session) | ||
|
||
elif options.type in PROOF_TYPES: | ||
# TODO add support for Ed25519Signature2020 | ||
pass | ||
|
||
else: | ||
raise DataIntegrityManagerError(f"Unsupported proof type {options.type}") | ||
return suite | ||
|
||
def validate_proof_options(self, proof_options: DataIntegrityProofOptions): | ||
"""Generic proof assertions for a data integrity proof options.""" | ||
if proof_options.created: | ||
try: | ||
datetime.fromisoformat(proof_options.created) | ||
except ValueError: | ||
raise DataIntegrityManagerError( | ||
f"Invalid proof creation datetime format {proof_options.created}" | ||
) | ||
if proof_options.expires: | ||
try: | ||
datetime.fromisoformat(proof_options.expires) | ||
except ValueError: | ||
raise DataIntegrityManagerError( | ||
f"Invalid proof expiration datetime format {proof_options.expires}" | ||
) | ||
if proof_options.type not in PROOF_TYPES: | ||
raise DataIntegrityManagerError( | ||
f"Unsupported proof type {proof_options.type}" | ||
) | ||
if proof_options.type == "DataIntegrityProof": | ||
if not proof_options.cryptosuite: | ||
raise DataIntegrityManagerError( | ||
"DataIntegrityProof must specify a cryptosuite." | ||
) | ||
if proof_options.cryptosuite not in CRYPTOSUITES: | ||
raise DataIntegrityManagerError( | ||
f"Unsupported cryptosuite {proof_options.cryptosuite}" | ||
) | ||
if proof_options.proof_purpose not in PROOF_PURPOSES: | ||
raise DataIntegrityManagerError( | ||
f"Unsupported proof purpose {proof_options.proof_purpose}" | ||
) |
Oops, something went wrong.