Skip to content

Commit

Permalink
add models
Browse files Browse the repository at this point in the history
Signed-off-by: PatStLouis <[email protected]>
  • Loading branch information
PatStLouis committed Sep 27, 2024
1 parent d850f8f commit a11f848
Show file tree
Hide file tree
Showing 8 changed files with 385 additions and 131 deletions.
132 changes: 80 additions & 52 deletions aries_cloudagent/vc/data_integrity/cryptosuites/eddsa_jcs_2022.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@
import canonicaljson

from ....wallet.base import BaseWallet
from ....wallet.keys.manager import MultikeyManager
from ....wallet.keys.manager import (
MultikeyManager,
multikey_to_verkey,
verkey_to_multikey,
key_type_from_multikey,
)
from ....utils.multiformats import multibase
from ....core.profile import ProfileSession
from ....resolver.did_resolver import DIDResolver
from ..models.options import DataIntegrityProofOptions
from ..models.proof import DataIntegrityProof
from ..models.verification_response import ProblemDetails, DataIntegrityVerificationResult
from ..errors import PROBLEM_DETAILS


Expand All @@ -33,12 +41,14 @@ def __init__(self, *, session: ProfileSession):
self.wallet = session.inject(BaseWallet)
self.key_manager = MultikeyManager(session)

async def create_proof(self, unsecured_data_document: dict, options: dict):
async def create_proof(
self, unsecured_data_document: dict, options: DataIntegrityProofOptions
):
"""Create proof algorithm.
https://www.w3.org/TR/vc-di-eddsa/#create-proof-eddsa-jcs-2022.
"""
proof = options.copy()
proof = DataIntegrityProof.deserialize(options.serialize().copy())

# Spec says to copy document context to the proof but it's unecessary IMO,
# commenting out for the time being...
Expand All @@ -51,53 +61,54 @@ async def create_proof(self, unsecured_data_document: dict, options: dict):
hash_data = self.hashing(transformed_data, proof_config)
proof_bytes = await self.proof_serialization(hash_data, options)

proof["proofValue"] = multibase.encode(proof_bytes, "base58btc")
proof.proof_value = multibase.encode(proof_bytes, "base58btc")

return proof

def proof_configuration(self, options: dict):
def proof_configuration(self, options: DataIntegrityProofOptions):
"""Proof configuration algorithm.
https://www.w3.org/TR/vc-di-eddsa/#proof-configuration-eddsa-jcs-2022.
"""
proof_config = options.copy()

proof_config = options
assert (
proof_config["type"] == "DataIntegrityProof"
proof_config.type == "DataIntegrityProof"
), 'Expected proof.type to be "DataIntegrityProof'
assert (
proof_config["cryptosuite"] == "eddsa-jcs-2022"
proof_config.cryptosuite == "eddsa-jcs-2022"
), 'Expected proof.cryptosuite to be "eddsa-jcs-2022'

if "created" in proof_config:
if proof_config.created:
# TODO assert proper [XMLSCHEMA11-2] dateTimeStamp string
assert proof_config[
"created"
], "Expected proof.created to be a [XMLSCHEMA11-2] dateTimeStamp string."
assert (
proof_config.created
), "Expected proof.created to be a [XMLSCHEMA11-2] dateTimeStamp string."

if "expires" in proof_config:
if proof_config.expires:
# TODO assert proper [XMLSCHEMA11-2] dateTimeStamp string
assert proof_config[
"expires"
], "Expected proof.expires to be a [XMLSCHEMA11-2] dateTimeStamp string."
assert (
proof_config.expires
), "Expected proof.expires to be a [XMLSCHEMA11-2] dateTimeStamp string."

return self._canonicalize(proof_config)
return self._canonicalize(proof_config.serialize())

def transformation(self, unsecured_document: dict, options: dict):
def transformation(
self, unsecured_document: dict, options: DataIntegrityProofOptions
):
"""Transformation algorithm.
https://www.w3.org/TR/vc-di-eddsa/#transformation-eddsa-jcs-2022.
"""
assert (
options["type"] == "DataIntegrityProof"
options.type == "DataIntegrityProof"
), "Expected proof.type to be `DataIntegrityProof`"
assert (
options["cryptosuite"] == "eddsa-jcs-2022"
options.cryptosuite == "eddsa-jcs-2022"
), "Expected proof.cryptosuite to be `eddsa-jcs-2022`"

return self._canonicalize(unsecured_document)

def hashing(self, transformed_document, canonical_proof_config):
def hashing(self, transformed_document: bytes, canonical_proof_config: bytes):
"""Hashing algorithm.
https://www.w3.org/TR/vc-di-eddsa/#hashing-eddsa-jcs-2022.
Expand All @@ -107,48 +118,47 @@ def hashing(self, transformed_document, canonical_proof_config):
+ sha256(transformed_document).digest()
)

async def proof_serialization(self, hash_data: bytes, options: dict):
async def proof_serialization(
self, hash_data: bytes, options: DataIntegrityProofOptions
):
"""Proof Serialization Algorithm.
https://www.w3.org/TR/vc-di-eddsa/#proof-serialization-eddsa-jcs-2022.
"""
# If the verification method is a did:key: URI,
# we derive the signing key from a multikey value
if options["verificationMethod"].startswith("did:key:"):
multikey = options["verificationMethod"].split("#")[-1]
# TODO encapsulate in a key manager method
if options.verification_method.startswith("did:key:"):
multikey = options.verification_method.split("#")[-1]
key_info = await self.key_manager.from_multikey(multikey)

# Otherwise we derive the signing key from a kid
else:
key_info = await self.key_manager.from_kid(options["verificationMethod"])
key_info = await self.key_manager.from_kid(options.verification_method)

return await self.wallet.sign_message(
message=hash_data,
from_verkey=self.key_manager._multikey_to_verkey(key_info["multikey"]),
from_verkey=multikey_to_verkey(key_info["multikey"]),
)

def _canonicalize(self, data: dict):
"""Json canonicalization."""
return canonicaljson.encode_canonical_json(data)

async def _get_multikey(self, kid: str):
async def _resolve_multikey(self, kid: str):
"""Derive a multikey from the verification method."""
resolver = self.session.inject(DIDResolver)
verification_method = await resolver.dereference(
profile=self.session.profile, did_url=kid
)

# If verification method is a did:key URI,
# we derive the multikey directly from the value.
if kid.startswith("did:key:"):
return kid.split("#")[-1]
if verification_method.type == "Ed25519VerificationKey2018":
multikey = verkey_to_multikey(verification_method.public_key_base58)

# Otherwise we resolve the verification method and extract the multikey.
else:
verification_method = await DIDResolver().dereference(
profile=self.session.profile, did_url=kid
)
assert (
verification_method["type"] == "Multikey"
), "Expected Multikey verificationMethod type"
verification_method.type == "Multikey"
), "Expecting Multikey verification method type"
multikey = verification_method.public_key_multibase

return verification_method["publicKeyMultibase"]
return multikey

async def verify_proof(self, secured_document: dict):
"""Verify proof algorithm.
Expand All @@ -171,31 +181,49 @@ async def verify_proof(self, secured_document: dict):
# assert secured_document['@context'] == proof_options['@context']
# unsecured_document['@context'] = proof_options['@context']

proof_options = DataIntegrityProofOptions.deserialize(proof_options)
transformed_data = self.transformation(unsecured_document, proof_options)
proof_config = self.proof_configuration(proof_options)
hash_data = self.hashing(transformed_data, proof_config)
if not await self.proof_verification(hash_data, proof_bytes, proof_options):
verified = await self.proof_verification(
hash_data, proof_bytes, proof_options
)
if not verified:
raise CryptosuiteError("Invalid signature.")

except (AssertionError, CryptosuiteError) as err:
problem_detail = PROBLEM_DETAILS["PROOF_VERIFICATION_ERROR"] | {
"message": str(err)
}
return {"verified": False, "proof": proof, "problemDetails": [problem_detail]}
problem_detail = ProblemDetails.deserialize(
PROBLEM_DETAILS["PROOF_VERIFICATION_ERROR"]
)
problem_detail.detail = str(err)
return DataIntegrityVerificationResult(
verified=False,
proof=DataIntegrityProof.deserialize(proof),
problem_details=[problem_detail],
)

return {"verified": True, "proof": proof, "problemDetails": []}
return DataIntegrityVerificationResult(
verified=True,
proof=DataIntegrityProof.deserialize(proof),
problem_details=[],
)

async def proof_verification(
self, hash_data: bytes, proof_bytes: bytes, options: dict
self, hash_data: bytes, proof_bytes: bytes, options: DataIntegrityProofOptions
):
"""Proof verification algorithm.
https://www.w3.org/TR/vc-di-eddsa/#proof-verification-eddsa-jcs-2022.
"""
multikey = await self._get_multikey(options["verificationMethod"])
multikey = await MultikeyManager(
self.session
).resolve_multikey_from_verification_method(options.verification_method)
# multikey = await self._resolve_multikey(options.verification_method)
verkey = multikey_to_verkey(multikey)
key_type = key_type_from_multikey(multikey)
return await self.wallet.verify_message(
message=hash_data,
signature=proof_bytes,
from_verkey=self.key_manager._multikey_to_verkey(multikey),
key_type=self.key_manager.key_type_from_multikey(multikey),
from_verkey=verkey,
key_type=key_type,
)
61 changes: 37 additions & 24 deletions aries_cloudagent/vc/data_integrity/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

from ...core.profile import ProfileSession
from .cryptosuites import CRYPTOSUITES
from .models.proof import DataIntegrityProof
from .models.options import DataIntegrityProofOptions
from .models.verification_response import (
DataIntegrityVerificationResponse,
DataIntegrityVerificationResult,
ProblemDetails,
)
from .errors import PROBLEM_DETAILS


class DataIntegrityManagerError(Exception):
Expand All @@ -15,14 +23,14 @@ def __init__(self, session: ProfileSession):
"""Initialize the DataIntegrityManager."""
self.session = session

async def add_proof(self, document, options):
async def add_proof(self, document: dict, options: DataIntegrityProofOptions):
"""Data integrity add proof algorithm.
https://www.w3.org/TR/vc-data-integrity/#add-proof.
"""

# Instanciate a cryptosuite
suite = CRYPTOSUITES[options["cryptosuite"]](session=self.session)
suite = CRYPTOSUITES[options.cryptosuite](session=self.session)

# Capture existing proofs if any
all_proofs = document.pop("proof", [])
Expand All @@ -32,47 +40,52 @@ async def add_proof(self, document, options):
# Create secured document and create new proof
secured_document = document.copy()
secured_document["proof"] = all_proofs
secured_document["proof"].append(await suite.create_proof(document, options))
proof = await suite.create_proof(document, options)
secured_document["proof"].append(proof.serialize())
return secured_document

async def verify_proof(self, secured_document):
async def verify_proof(self, secured_document: dict):
"""Verify a proof attached to a secured document.
https://www.w3.org/TR/vc-data-integrity/#verify-proof.
"""
unsecured_document = secured_document.copy()
all_proofs = unsecured_document.pop("proof")
all_proofs = all_proofs if isinstance(all_proofs, list) else [all_proofs]
verification_results = {}
verification_results["verifiedDocument"] = unsecured_document
verification_results["results"] = []
verification_results = []
for proof in all_proofs:
try:
proof = DataIntegrityProof.deserialize(proof)
self.assert_proof(proof)
# Instanciate a cryptosuite
suite = CRYPTOSUITES[proof["cryptosuite"]](session=self.session)
suite = CRYPTOSUITES[proof.cryptosuite](session=self.session)
input_document = unsecured_document.copy()
input_document["proof"] = proof
input_document["proof"] = proof.serialize()
verification_result = await suite.verify_proof(input_document)
verification_results["results"].append(verification_result)
except AssertionError as err:
verification_result = {
"verified": False,
"problemDetails": [{"type": "", "message": str(err)}],
}
verification_results["results"].append(verification_result)
verification_results["verified"] = (
True
if all(result["verified"] for result in verification_results["results"])
else False
problem_detail = ProblemDetails.deserialize(
PROBLEM_DETAILS["PROOF_VERIFICATION_ERROR"]
)
problem_detail.detail = str(err)
verification_result = DataIntegrityVerificationResult(
verified=False,
proof=proof,
problem_details=[problem_detail],
)
verification_results.append(verification_result)
return DataIntegrityVerificationResponse(
verified=(
True if all(result.verified for result in verification_results) else False
),
verified_document=unsecured_document,
results=verification_results,
)
return verification_results

def assert_proof(self, proof):
def assert_proof(self, proof: DataIntegrityProof):
"""Generic proof assertions for a data integrity proof."""
assert proof["cryptosuite"] in CRYPTOSUITES, "Unsupported cryptosuite."
assert proof["proofValue"], "Missing proof value."
assert proof["proofPurpose"] in [
assert proof.cryptosuite in CRYPTOSUITES, "Unsupported cryptosuite."
assert proof.proof_value, "Missing proof value."
assert proof.proof_purpose in [
"authentication",
"assertionMethod",
], "Unknown proofPurpose."
17 changes: 14 additions & 3 deletions aries_cloudagent/vc/data_integrity/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
from .proof import DIProof, DIProofSchema
from .options import AddProofOptionsSchema
from .proof import DataIntegrityProof, DataIntegrityProofSchema
from .options import DataIntegrityProofOptions, DataIntegrityProofOptionsSchema
from .verification_response import (
DataIntegrityVerificationResponseSchema,
DataIntegrityVerificationResponse,
)

__all__ = ["DIProof", "DIProofSchema", "DIProofOptions", "AddProofOptionsSchema"]
__all__ = [
"DataIntegrityProof",
"DataIntegrityProofSchema",
"DataIntegrityProofOptions",
"DataIntegrityProofOptionsSchema",
"DataIntegrityVerificationResponse",
"DataIntegrityVerificationResponseSchema",
]
Loading

0 comments on commit a11f848

Please sign in to comment.