Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multi-tenancy] add profile resource management #928

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aries_cloudagent/core/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ async def stop(self, timeout=1.0):
# close multitenant profiles
multitenant_mgr = self.context.inject(MultitenantManager, required=False)
if multitenant_mgr:
for profile in multitenant_mgr._instances.values():
for profile in multitenant_mgr._profiles.profiles.values():
shutdown.run(profile.close())

if self.root_profile:
Expand Down
14 changes: 8 additions & 6 deletions aries_cloudagent/core/tests/test_conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,12 +858,14 @@ async def test_shutdown_multitenant_profiles(self):

multitenant_mgr = conductor.context.inject(MultitenantManager)

multitenant_mgr._instances = {
"test1": async_mock.MagicMock(close=async_mock.CoroutineMock()),
"test2": async_mock.MagicMock(close=async_mock.CoroutineMock()),
}
await multitenant_mgr._profiles.put(
"test1", async_mock.MagicMock(close=async_mock.CoroutineMock())
)
await multitenant_mgr._profiles.put(
"test2", async_mock.MagicMock(close=async_mock.CoroutineMock())
)

await conductor.stop()

multitenant_mgr._instances["test1"].close.assert_called_once_with()
multitenant_mgr._instances["test2"].close.assert_called_once_with()
multitenant_mgr._profiles.get("test1").close.assert_called_once_with()
multitenant_mgr._profiles.get("test2").close.assert_called_once_with()
94 changes: 94 additions & 0 deletions aries_cloudagent/multitenant/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Cache for multitenancy profiles."""

import logging
import sys
from collections import OrderedDict
from typing import Optional

from ..core.profile import Profile

LOGGER = logging.getLogger(__name__)


class ProfileCache:
"""Profile cache that caches based on LRU strategy."""

def __init__(self, capacity: int):
"""Initialize ProfileCache.

Args:
capacity: The capacity of the cache. If capacity is exceeded
profiles are closed.
"""

self.profiles: OrderedDict[str, Profile] = OrderedDict()
self.capacity = capacity

async def _cleanup(self):
for (key, profile) in self.profiles.items():
# When ref count is 4 we can assume the profile is not referenced
# 1 = profiles dict
# 2 = self.profiles.items()
# 3 = profile above
# 4 = sys.getrefcount
if sys.getrefcount(profile) <= 4:
LOGGER.debug(f"closing profile with id {key}")
del self.profiles[key]
await profile.close()

if len(self.profiles) <= self.capacity:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit odd that this will always iterate through the profiles, short-circuiting if there is one to be cleaned up and we are now below max capacity. Also self.profiles.items() will increase the refcount of the dict itself but not the entries it contains, as I understand it (and according to my test just now).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not that happy with this approach myself. I'll see if I can rework it a bit.

Do you have a suggestion on how to approach this? Do you think the way ledger handles refcounting will also work for here?


def get(self, key: str) -> Optional[Profile]:
"""Get profile with associated key from cache.

Args:
key (str): the key to get the profile for.

Returns:
Optional[Profile]: Profile if found in cache.

"""
if key not in self.profiles:
return None
else:
self.profiles.move_to_end(key)
return self.profiles[key]

def has(self, key: str) -> bool:
"""Check whether there is a profile with associated key in the cache.

Args:
key (str): the key to check for a profile

Returns:
bool: Whether the key exists in the cache

"""
return key in self.profiles

async def put(self, key: str, value: Profile) -> None:
"""Add profile with associated key to the cache.

If new profile exceeds the cache capacity least recently used profiles
that are not used will be removed from the cache.

Args:
key (str): the key to set
value (Profile): the profile to set
"""
self.profiles[key] = value
self.profiles.move_to_end(key)
LOGGER.debug(f"setting profile with id {key} in profile cache")

if len(self.profiles) > self.capacity:
LOGGER.debug(f"profile limit of {self.capacity} reached. cleaning...")
await self._cleanup()

def remove(self, key: str):
"""Remove profile with associated key from the cache.

Args:
key (str): The key to remove from the cache.
"""
del self.profiles[key]
11 changes: 6 additions & 5 deletions aries_cloudagent/multitenant/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import jwt
from typing import List, Optional, cast

from .cache import ProfileCache
from ..core.profile import (
Profile,
ProfileSession,
Expand Down Expand Up @@ -47,7 +48,7 @@ def __init__(self, profile: Profile):
if not profile:
raise MultitenantManagerError("Missing profile")

self._instances: dict[str, Profile] = {}
self._profiles = ProfileCache(100)

@property
def profile(self) -> Profile:
Expand Down Expand Up @@ -118,7 +119,7 @@ async def get_wallet_profile(
"""
wallet_id = wallet_record.wallet_id

if wallet_id not in self._instances:
if not self._profiles.has(wallet_id):
# Extend base context
context = base_context.copy()

Expand Down Expand Up @@ -156,9 +157,9 @@ async def get_wallet_profile(

# MTODO: add ledger config
profile, _ = await wallet_config(context, provision=provision)
self._instances[wallet_id] = profile
await self._profiles.put(wallet_id, profile)

return self._instances[wallet_id]
return self._profiles.get(wallet_id)

async def create_wallet(
self,
Expand Down Expand Up @@ -251,7 +252,7 @@ async def remove_wallet(self, wallet_id: str, wallet_key: str = None):
{"wallet.key": wallet_key},
)

del self._instances[wallet_id]
self._profiles.remove(wallet_id)
await profile.remove()

# Remove all routing records associated with wallet
Expand Down
100 changes: 100 additions & 0 deletions aries_cloudagent/multitenant/tests/test_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import sys
from asynctest import TestCase as AsyncTestCase
from asynctest import mock as async_mock

from ..cache import ProfileCache


class TestProfileCache(AsyncTestCase):
async def setUp(self):
pass

async def test_cache_cleanup_capacity_reached(self):
with async_mock.patch.object(ProfileCache, "_cleanup") as _cleanup:
cache = ProfileCache(1)

await cache.put("1", async_mock.MagicMock())
_cleanup.assert_not_called()

await cache.put("2", async_mock.MagicMock())
_cleanup.assert_called_once()

async def test_get_not_in_cache(self):
cache = ProfileCache(1)

assert cache.get("1") is None

async def test_put_get_in_cache(self):
cache = ProfileCache(1)

profile = async_mock.MagicMock()
await cache.put("1", profile)

assert cache.get("1") is profile

async def test_remove(self):
cache = ProfileCache(1)

profile = async_mock.MagicMock()
await cache.put("1", profile)

assert cache.get("1") is profile

cache.remove("1")

assert cache.get("1") is None

async def test_has_true(self):
cache = ProfileCache(1)

profile = async_mock.MagicMock()

assert cache.has("1") is False
await cache.put("1", profile)
assert cache.has("1") is True

async def test_cleanup(self):
cache = ProfileCache(1)

with async_mock.patch.object(sys, "getrefcount") as getrefcount:
getrefcount.return_value = 4

profile1 = async_mock.MagicMock(close=async_mock.CoroutineMock())
profile2 = async_mock.MagicMock(close=async_mock.CoroutineMock())

await cache.put("1", profile1)

assert len(cache.profiles) == 1

await cache.put("2", profile2)

assert len(cache.profiles) == 1
assert cache.get("1") == None
profile1.close.assert_called_once()

async def test_cleanup_reference(self):
cache = ProfileCache(3)

with async_mock.patch.object(sys, "getrefcount") as getrefcount:
getrefcount.side_effect = [6, 4]

profile1 = async_mock.MagicMock(close=async_mock.CoroutineMock())
profile2 = async_mock.MagicMock(close=async_mock.CoroutineMock())
profile3 = async_mock.MagicMock(close=async_mock.CoroutineMock())
profile4 = async_mock.MagicMock(close=async_mock.CoroutineMock())

await cache.put("1", profile1)
await cache.put("2", profile2)
await cache.put("3", profile3)

assert len(cache.profiles) == 3

await cache.put("4", profile4)

assert len(cache.profiles) == 3
assert cache.get("1") == profile1
assert cache.get("2") == None
assert cache.get("3") == profile3
assert cache.get("4") == profile4

profile2.close.assert_called_once()
12 changes: 6 additions & 6 deletions aries_cloudagent/multitenant/tests/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,28 +54,28 @@ async def test_get_default_mediator(self):

async def test_get_wallet_profile_returns_from_cache(self):
wallet_record = WalletRecord(wallet_id="test")
self.manager._instances["test"] = InMemoryProfile.test_profile()
await self.manager._profiles.put("test", InMemoryProfile.test_profile())

with async_mock.patch(
"aries_cloudagent.config.wallet.wallet_config"
) as wallet_config:
profile = await self.manager.get_wallet_profile(
self.profile.context, wallet_record
)
assert profile is self.manager._instances["test"]
assert profile is self.manager._profiles.get("test")
wallet_config.assert_not_called()

async def test_get_wallet_profile_not_in_cache(self):
wallet_record = WalletRecord(wallet_id="test", settings={})
self.manager._instances["test"] = InMemoryProfile.test_profile()
await self.manager._profiles.put("test", InMemoryProfile.test_profile())

with async_mock.patch(
"aries_cloudagent.config.wallet.wallet_config"
) as wallet_config:
profile = await self.manager.get_wallet_profile(
self.profile.context, wallet_record
)
assert profile is self.manager._instances["test"]
assert profile is self.manager._profiles.get("test")
wallet_config.assert_not_called()

async def test_get_wallet_profile_settings(self):
Expand Down Expand Up @@ -382,13 +382,13 @@ async def test_remove_wallet_removes_profile_wallet_storage_records(self):
)
wallet_profile = InMemoryProfile.test_profile()

self.manager._instances["test"] = wallet_profile
await self.manager._profiles.put("test", wallet_profile)
retrieve_by_id.return_value = wallet_record
get_wallet_profile.return_value = wallet_profile

await self.manager.remove_wallet("test")

assert "test" not in self.manager._instances
assert not self.manager._profiles.has("test")
get_wallet_profile.assert_called_once_with(
self.profile.context, wallet_record, {"wallet.key": "test_key"}
)
Expand Down