Skip to content

Commit

Permalink
add type hints and pass session to call functions instead of profile
Browse files Browse the repository at this point in the history
Signed-off-by: PatStLouis <[email protected]>
  • Loading branch information
PatStLouis committed Sep 24, 2024
1 parent b6f837a commit 4c94560
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 68 deletions.
86 changes: 40 additions & 46 deletions aries_cloudagent/wallet/keys/manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Multikey class."""

from ...core.profile import Profile
from ...core.profile import ProfileSession
from ..base import BaseWallet
from ..key_type import ED25519
from ..util import b58_to_bytes, bytes_to_b58
Expand All @@ -20,23 +20,23 @@ class MultikeyManagerError(Exception):
class MultikeyManager:
"""Class for managing wallet keys."""

def __init__(self, profile=Profile):
def __init__(self, session: ProfileSession):
"""Initialize the MultikeyManager."""
self.profile = profile
self.session = session

def _multikey_to_verkey(self, multikey, alg=DEFAULT_ALG):
def _multikey_to_verkey(self, multikey: str, alg: str = DEFAULT_ALG):
"""Transform multikey to verkey."""
prefix_length = ALG_MAPPINGS[alg]["prefix_length"]
public_bytes = bytes(bytearray(multibase.decode(multikey))[prefix_length:])
return bytes_to_b58(public_bytes)

def _verkey_to_multikey(self, verkey, alg=DEFAULT_ALG):
def _verkey_to_multikey(self, verkey: str, alg: str = DEFAULT_ALG):
"""Transform verkey to multikey."""
prefix_hex = ALG_MAPPINGS[alg]["prefix_hex"]
prefixed_key_hex = f"{prefix_hex}{b58_to_bytes(verkey).hex()}"
return multibase.encode(bytes.fromhex(prefixed_key_hex), "base58btc")

async def kid_exists(self, wallet, kid):
async def kid_exists(self, wallet: BaseWallet, kid: str):
"""Check if kid exists."""
try:
key = await wallet.get_key_by_kid(kid=kid)
Expand All @@ -47,59 +47,53 @@ async def kid_exists(self, wallet, kid):

async def from_kid(self, kid: str):
"""Fetch a single key."""
async with self.profile.session() as session:
wallet: BaseWallet | None = session.inject_or(BaseWallet)
key_info = await wallet.get_key_by_kid(kid=kid)
return {
"kid": key_info.kid,
"multikey": self._verkey_to_multikey(key_info.verkey),
}
wallet: BaseWallet | None = self.session.inject_or(BaseWallet)
key_info = await wallet.get_key_by_kid(kid=kid)
return {
"kid": key_info.kid,
"multikey": self._verkey_to_multikey(key_info.verkey),
}

async def from_multikey(self, multikey: str):
"""Fetch a single key."""
async with self.profile.session() as session:
wallet: BaseWallet | None = session.inject_or(BaseWallet)
key_info = await wallet.get_signing_key(
verkey=self._multikey_to_verkey(multikey)
)
return {
"kid": key_info.kid,
"multikey": self._verkey_to_multikey(key_info.verkey),
}

async def create(self, seed=None, kid=None, alg=DEFAULT_ALG):
wallet: BaseWallet | None = self.session.inject_or(BaseWallet)
key_info = await wallet.get_signing_key(verkey=self._multikey_to_verkey(multikey))
return {
"kid": key_info.kid,
"multikey": self._verkey_to_multikey(key_info.verkey),
}

async def create(self, seed: str = None, kid: str = None, alg: str = DEFAULT_ALG):
"""Create a new key pair."""

if alg not in ALG_MAPPINGS:
raise MultikeyManagerError(
f"Unknown key algorithm, use one of {list(ALG_MAPPINGS.keys())}."
)

async with self.profile.session() as session:
wallet: BaseWallet | None = session.inject_or(BaseWallet)
wallet: BaseWallet | None = self.session.inject_or(BaseWallet)

if kid and await self.kid_exists(wallet=wallet, kid=kid):
raise MultikeyManagerError(f"kid '{kid}' already exists in wallet.")
if kid and await self.kid_exists(wallet=wallet, kid=kid):
raise MultikeyManagerError(f"kid '{kid}' already exists in wallet.")

key_type = ALG_MAPPINGS[alg]["key_type"]
key_info = await wallet.create_key(key_type=key_type, seed=seed, kid=kid)
return {
"kid": key_info.kid,
"multikey": self._verkey_to_multikey(key_info.verkey),
}
key_type = ALG_MAPPINGS[alg]["key_type"]
key_info = await wallet.create_key(key_type=key_type, seed=seed, kid=kid)
return {
"kid": key_info.kid,
"multikey": self._verkey_to_multikey(key_info.verkey),
}

async def update(self, multikey: str, kid: str):
"""Assign a new kid to a key pair."""

async with self.profile.session() as session:
wallet: BaseWallet | None = session.inject_or(BaseWallet)

if kid and await self.kid_exists(wallet=wallet, kid=kid):
raise MultikeyManagerError(f"kid '{kid}' already exists in wallet.")
key_info = await wallet.assign_kid_to_key(
verkey=self._multikey_to_verkey(multikey), kid=kid
)
return {
"kid": key_info.kid,
"multikey": self._verkey_to_multikey(key_info.verkey),
}
wallet: BaseWallet | None = self.session.inject_or(BaseWallet)

if kid and await self.kid_exists(wallet=wallet, kid=kid):
raise MultikeyManagerError(f"kid '{kid}' already exists in wallet.")
key_info = await wallet.assign_kid_to_key(
verkey=self._multikey_to_verkey(multikey), kid=kid
)
return {
"kid": key_info.kid,
"multikey": self._verkey_to_multikey(key_info.verkey),
}
16 changes: 11 additions & 5 deletions aries_cloudagent/wallet/keys/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ async def fetch_key(request: web.BaseRequest):
multikey = request.match_info["multikey"]

try:
async with context.session() as session:
key_info = await MultikeyManager(session).from_multikey(multikey=multikey)
return web.json_response(
await MultikeyManager(context.profile).from_multikey(multikey=multikey),
key_info,
status=200,
)

Expand Down Expand Up @@ -168,8 +170,10 @@ async def create_key(request: web.BaseRequest):
raise MultikeyManagerError("Seed support is not enabled.")

try:
async with context.session() as session:
key_info = await MultikeyManager(session).create(seed=seed, kid=kid, alg=alg)
return web.json_response(
await MultikeyManager(context.profile).create(seed=seed, kid=kid, alg=alg),
key_info,
status=201,
)
except (MultikeyManagerError, WalletDuplicateError, WalletNotFoundError) as err:
Expand Down Expand Up @@ -197,11 +201,13 @@ async def update_key(request: web.BaseRequest):
kid = body.get("kid")

try:
return web.json_response(
await MultikeyManager(context.profile).update(
async with context.session() as session:
key_info = await MultikeyManager(session).update(
multikey=multikey,
kid=kid,
),
)
return web.json_response(
key_info,
status=200,
)
except (MultikeyManagerError, WalletDuplicateError, WalletNotFoundError) as err:
Expand Down
46 changes: 29 additions & 17 deletions aries_cloudagent/wallet/keys/tests/test_key_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,34 @@ class TestKeyOperations(IsolatedAsyncioTestCase):
kid = "did:web:example.com#key-01"

async def test_key_creation(self):
key_info = await self.manager.create(seed=self.seed)
assert key_info["multikey"] == self.multikey
assert key_info["kid"] is None

key_info = await self.manager.from_multikey(multikey=self.multikey)
assert key_info["multikey"] == self.multikey
assert key_info["kid"] is None

key_info = await self.manager.update(multikey=self.multikey, kid=self.kid)
assert key_info["multikey"] == self.multikey
assert key_info["kid"] == self.kid

key_info = await self.manager.from_kid(kid=self.kid)
assert key_info["multikey"] == self.multikey
assert key_info["kid"] == self.kid
async with self.profile.session() as session:
key_info = await MultikeyManager(session=session).create(seed=self.seed)
assert key_info["multikey"] == self.multikey
assert key_info["kid"] is None

key_info = await MultikeyManager(session=session).from_multikey(
multikey=self.multikey
)
assert key_info["multikey"] == self.multikey
assert key_info["kid"] is None

key_info = await MultikeyManager(session=session).update(
multikey=self.multikey, kid=self.kid
)
assert key_info["multikey"] == self.multikey
assert key_info["kid"] == self.kid

key_info = await MultikeyManager(session=session).from_kid(kid=self.kid)
assert key_info["multikey"] == self.multikey
assert key_info["kid"] == self.kid

async def test_key_representations(self):
assert self.manager._multikey_to_verkey(self.multikey) == self.verkey
assert self.manager._verkey_to_multikey(self.verkey) == self.multikey
async with self.profile.session() as session:
assert (
MultikeyManager(session=session)._multikey_to_verkey(self.multikey)
== self.verkey
)
assert (
MultikeyManager(session=session)._verkey_to_multikey(self.verkey)
== self.multikey
)

0 comments on commit 4c94560

Please sign in to comment.