Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to anoncreds via api endpoint #2922

Merged
merged 6 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion aries_cloudagent/admin/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
setup_aiohttp_apispec,
validation_middleware,
)

from marshmallow import fields

from aries_cloudagent.wallet import singletons

from ..config.injection_context import InjectionContext
from ..config.logging import context_wallet_id
from ..core.event_bus import Event, EventBus
Expand All @@ -31,13 +32,16 @@
from ..messaging.responder import BaseResponder
from ..messaging.valid import UUIDFour
from ..multitenant.base import BaseMultitenantManager, MultitenantManagerError
from ..storage.base import BaseStorage
from ..storage.error import StorageNotFoundError
from ..storage.type import RECORD_TYPE_ACAPY_UPGRADING
from ..transport.outbound.message import OutboundMessage
from ..transport.outbound.status import OutboundSendStatus
from ..transport.queue.basic import BasicMessageQueue
from ..utils.stats import Collector
from ..utils.task_queue import TaskQueue
from ..version import __version__
from ..wallet.anoncreds_upgrade import check_upgrade_completion_loop
from .base_server import BaseAdminServer
from .error import AdminSetupError
from .request_context import AdminRequestContext
Expand All @@ -58,6 +62,9 @@
"acapy::keylist::updated": "keylist",
}

anoncreds_wallets = singletons.IsAnoncredsSingleton().wallets
in_progress_upgrades = singletons.UpgradeInProgressSingleton()


class AdminModulesSchema(OpenAPISchema):
"""Schema for the modules endpoint."""
Expand Down Expand Up @@ -205,6 +212,40 @@ async def ready_middleware(request: web.BaseRequest, handler: Coroutine):
raise web.HTTPServiceUnavailable(reason="Shutdown in progress")


@web.middleware
async def upgrade_middleware(request: web.BaseRequest, handler: Coroutine):
"""Blocking middleware for upgrades."""
context: AdminRequestContext = request["context"]

# Already upgraded
if context.profile.name in anoncreds_wallets:
return await handler(request)

# Upgrade in progress
if context.profile.name in in_progress_upgrades.wallets:
raise web.HTTPServiceUnavailable(reason="Upgrade in progress")

# Avoid try/except in middleware with find_all_records
upgrade_initiated = []
async with context.profile.session() as session:
storage = session.inject(BaseStorage)
upgrade_initiated = await storage.find_all_records(RECORD_TYPE_ACAPY_UPGRADING)
if upgrade_initiated:
# If we get here, than another instance started an upgrade
# We need to check for completion (or fail) in another process
in_progress_upgrades.set_wallet(context.profile.name)
is_subwallet = context.metadata and "wallet_id" in context.metadata
asyncio.create_task(
check_upgrade_completion_loop(
context.profile,
is_subwallet,
)
)
raise web.HTTPServiceUnavailable(reason="Upgrade in progress")

return await handler(request)


@web.middleware
async def debug_middleware(request: web.BaseRequest, handler: Coroutine):
"""Show request detail in debug log."""
Expand Down Expand Up @@ -351,6 +392,8 @@ async def check_multitenant_authorization(request: web.Request, handler):

is_multitenancy_path = path.startswith("/multitenancy")
is_server_path = path in self.server_paths or path == "/features"
# allow base wallets to trigger update through api
is_upgrade_path = path.startswith("/anoncreds/wallet/upgrade")

# subwallets are not allowed to access multitenancy routes
if authorization_header and is_multitenancy_path:
Expand Down Expand Up @@ -380,6 +423,7 @@ async def check_multitenant_authorization(request: web.Request, handler):
and not is_unprotected_path(path)
and not base_limited_access_path
and not (request.method == "OPTIONS") # CORS fix
and not is_upgrade_path
):
raise web.HTTPUnauthorized()

Expand Down Expand Up @@ -453,6 +497,9 @@ async def setup_context(request: web.Request, handler):

middlewares.append(setup_context)

# Upgrade middleware needs the context setup
middlewares.append(upgrade_middleware)

# Register validation_middleware last avoiding unauthorized validations
middlewares.append(validation_middleware)

Expand Down Expand Up @@ -583,6 +630,10 @@ def sort_dict(raw: dict) -> dict:

async def stop(self) -> None:
"""Stop the webserver."""
# Stopped before admin server is created
if not self.app:
return

self.app._state["ready"] = False # in case call does not come through OpenAPI
for queue in self.websocket_queues.values():
queue.stop()
Expand Down
55 changes: 51 additions & 4 deletions aries_cloudagent/admin/tests/test_admin_server.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
import gc
import json
from unittest import IsolatedAsyncioTestCase

import pytest
from aries_cloudagent.tests import mock
from unittest import IsolatedAsyncioTestCase
from aiohttp import ClientSession, DummyCookieJar, TCPConnector, web
from aiohttp.test_utils import unused_port

from aries_cloudagent.tests import mock
from aries_cloudagent.wallet import singletons

from ...config.default_context import DefaultContextBuilder
from ...config.injection_context import InjectionContext
from ...core.event_bus import Event
from ...core.goal_code_registry import GoalCodeRegistry
from ...core.in_memory import InMemoryProfile
from ...core.protocol_registry import ProtocolRegistry
from ...core.goal_code_registry import GoalCodeRegistry
from ...storage.base import BaseStorage
from ...storage.record import StorageRecord
from ...storage.type import RECORD_TYPE_ACAPY_UPGRADING
from ...utils.stats import Collector
from ...utils.task_queue import TaskQueue

from ...wallet.anoncreds_upgrade import UPGRADING_RECORD_IN_PROGRESS
from .. import server as test_module
from ..request_context import AdminRequestContext
from ..server import AdminServer, AdminSetupError


Expand Down Expand Up @@ -477,6 +483,47 @@ async def test_server_health_state(self):
assert response.status == 503
await server.stop()

async def test_upgrade_middleware(self):
profile = InMemoryProfile.test_profile()
self.context = AdminRequestContext.test_context({}, profile)
self.request_dict = {
"context": self.context,
}
request = mock.MagicMock(
method="GET",
path_qs="/schemas/created",
match_info={},
__getitem__=lambda _, k: self.request_dict[k],
)
handler = mock.CoroutineMock()

await test_module.upgrade_middleware(request, handler)

async with profile.session() as session:
storage = session.inject(BaseStorage)
upgrading_record = StorageRecord(
RECORD_TYPE_ACAPY_UPGRADING,
UPGRADING_RECORD_IN_PROGRESS,
)
# No upgrade in progress
await storage.add_record(upgrading_record)

# Upgrade in progress without cache
with self.assertRaises(test_module.web.HTTPServiceUnavailable):
await test_module.upgrade_middleware(request, handler)

# Upgrade in progress with cache
singletons.UpgradeInProgressSingleton().set_wallet("test-profile")
with self.assertRaises(test_module.web.HTTPServiceUnavailable):
await test_module.upgrade_middleware(request, handler)

singletons.UpgradeInProgressSingleton().remove_wallet("test-profile")
await storage.delete_record(upgrading_record)

# Upgrade in progress with cache
singletons.IsAnoncredsSingleton().set_wallet("test-profile")
await test_module.upgrade_middleware(request, handler)


@pytest.fixture
async def server():
Expand Down
6 changes: 1 addition & 5 deletions aries_cloudagent/anoncreds/default/legacy_indy/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1123,18 +1123,14 @@ async def fix_ledger_entry(

async def txn_submit(
self,
profile: Profile,
ledger: BaseLedger,
ledger_transaction: str,
sign: bool = None,
taa_accept: bool = None,
sign_did: DIDInfo = sentinel,
write_ledger: bool = True,
) -> str:
"""Submit a transaction to the ledger."""
ledger = profile.inject(BaseLedger)

if not ledger:
raise LedgerError("No ledger available")

try:
async with ledger:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from base58 import alphabet

from .....anoncreds.base import (
AnonCredsRegistrationError,
AnonCredsSchemaAlreadyExists,
)
from .....anoncreds.models.anoncreds_schema import (
Expand All @@ -21,7 +20,7 @@
from .....connections.models.conn_record import ConnRecord
from .....core.in_memory.profile import InMemoryProfile
from .....ledger.base import BaseLedger
from .....ledger.error import LedgerError, LedgerObjectAlreadyExistsError
from .....ledger.error import LedgerObjectAlreadyExistsError
from .....messaging.responder import BaseResponder
from .....protocols.endorse_transaction.v1_0.manager import (
TransactionManager,
Expand Down Expand Up @@ -728,27 +727,16 @@ async def test_register_revocation_registry_definition_with_create_transaction_a
assert mock_create_record.called

async def test_txn_submit(self):
self.profile.inject = mock.MagicMock(
side_effect=[
None,
mock.CoroutineMock(
txn_submit=mock.CoroutineMock(side_effect=LedgerError("test error"))
),
mock.CoroutineMock(
txn_submit=mock.CoroutineMock(return_value="transaction response")
),
]
self.profile.context.injector.bind_instance(
BaseLedger,
mock.MagicMock(
txn_submit=mock.CoroutineMock(return_value="transaction_id")
),
)

# No ledger
with self.assertRaises(LedgerError):
await self.registry.txn_submit(self.profile, "test_txn")
# Write error
with self.assertRaises(AnonCredsRegistrationError):
await self.registry.txn_submit(self.profile, "test_txn")

result = await self.registry.txn_submit(self.profile, "test_txn")
assert result == "transaction response"
async with self.profile.session() as session:
ledger = session.inject(BaseLedger)
result = await self.registry.txn_submit(ledger, "test_txn")
assert result == "transaction_id"

async def test_register_revocation_list_no_endorsement(self):
self.profile.context.injector.bind_instance(
Expand Down
Loading