diff --git a/aries_cloudagent/core/conductor.py b/aries_cloudagent/core/conductor.py index e31eacdd61..539dc5d4b4 100644 --- a/aries_cloudagent/core/conductor.py +++ b/aries_cloudagent/core/conductor.py @@ -50,6 +50,7 @@ from ..vc.ld_proofs.document_loader import DocumentLoader from ..wallet.did_info import DIDInfo from .dispatcher import Dispatcher +from .util import STARTUP_EVENT_TOPIC, SHUTDOWN_EVENT_TOPIC LOGGER = logging.getLogger(__name__) @@ -363,8 +364,14 @@ async def start(self) -> None: except Exception: LOGGER.exception("Error accepting mediation invitation") + # notify protcols of startup status + await self.root_profile.notify(STARTUP_EVENT_TOPIC, {}) + async def stop(self, timeout=1.0): """Stop the agent.""" + # notify protcols that we are shutting down + await self.root_profile.notify(SHUTDOWN_EVENT_TOPIC, {}) + shutdown = TaskQueue() if self.dispatcher: shutdown.run(self.dispatcher.complete()) diff --git a/aries_cloudagent/core/util.py b/aries_cloudagent/core/util.py new file mode 100644 index 0000000000..d97a6cdac1 --- /dev/null +++ b/aries_cloudagent/core/util.py @@ -0,0 +1,10 @@ +"""Core utilities and constants.""" + +import re + + +CORE_EVENT_PREFIX = "acapy::CORE::" +STARTUP_EVENT_TOPIC = CORE_EVENT_PREFIX + "STARTUP" +STARTUP_EVENT_PATTERN = re.compile(f"^{STARTUP_EVENT_TOPIC}?$") +SHUTDOWN_EVENT_TOPIC = CORE_EVENT_PREFIX + "SHUTDOWN" +SHUTDOWN_EVENT_PATTERN = re.compile(f"^{SHUTDOWN_EVENT_TOPIC}?$") diff --git a/aries_cloudagent/protocols/endorse_transaction/v1_0/routes.py b/aries_cloudagent/protocols/endorse_transaction/v1_0/routes.py index ee55492e71..54aa2e091d 100644 --- a/aries_cloudagent/protocols/endorse_transaction/v1_0/routes.py +++ b/aries_cloudagent/protocols/endorse_transaction/v1_0/routes.py @@ -14,6 +14,9 @@ from ....admin.request_context import AdminRequestContext from ....connections.models.conn_record import ConnRecord +from ....core.event_bus import Event, EventBus +from ....core.profile import Profile +from ....core.util import STARTUP_EVENT_PATTERN, SHUTDOWN_EVENT_PATTERN from ....indy.issuer import IndyIssuerError from ....ledger.error import LedgerError from ....messaging.models.base import BaseModelError @@ -694,6 +697,22 @@ async def transaction_write(request: web.BaseRequest): return web.json_response(tx_completed.serialize()) +def register_events(event_bus: EventBus): + """Subscribe to any events we need to support.""" + event_bus.subscribe(STARTUP_EVENT_PATTERN, on_startup_event) + event_bus.subscribe(SHUTDOWN_EVENT_PATTERN, on_shutdown_event) + + +async def on_startup_event(profile: Profile, event: Event): + """Handle any events we need to support.""" + print(">>> Received STARTUP event") + + +async def on_shutdown_event(profile: Profile, event: Event): + """Handle any events we need to support.""" + print(">>> Received SHUTDOWN event") + + async def register(app: web.Application): """Register routes."""