diff --git a/aries_cloudagent/admin/server.py b/aries_cloudagent/admin/server.py index ceaa98e227..5cd59749eb 100644 --- a/aries_cloudagent/admin/server.py +++ b/aries_cloudagent/admin/server.py @@ -41,8 +41,8 @@ LOGGER = logging.getLogger(__name__) -EVENT_PATTERN_ACAPY = re.compile("^acapy::(.*)$") EVENT_PATTERN_WEBHOOK = re.compile("^acapy::webhook::(.*)$") +EVENT_PATTERN_RECORD = re.compile("^acapy::record::(.*)::(.*)$") EVENT_WEBHOOK_MAPPING = { "acapy::basicmessage::received": "basicmessages", @@ -416,7 +416,16 @@ def sort_dict(raw: dict) -> dict: event_bus = self.context.inject(EventBus, required=False) if event_bus: - event_bus.subscribe(EVENT_PATTERN_ACAPY, self.__on_acapy_event) + event_bus.subscribe(EVENT_PATTERN_WEBHOOK, self.__on_webhook_event) + event_bus.subscribe(EVENT_PATTERN_RECORD, self.__on_record_event) + + for event_topic, webhook_topic in EVENT_WEBHOOK_MAPPING.items(): + event_bus.subscribe( + re.compile(re.escape(event_topic)), + lambda profile, event: self.send_webhook( + profile, webhook_topic, event.payload + ), + ) # order tags alphabetically, parameters deterministically and pythonically swagger_dict = self.app._state["swagger_dict"] @@ -706,12 +715,15 @@ async def websocket_handler(self, request): return ws - async def __on_acapy_event(self, profile: Profile, event: Event): - webhook_topic = EVENT_WEBHOOK_MAPPING.get(event.topic) - if not webhook_topic: - match = EVENT_PATTERN_WEBHOOK.search(event.topic) - webhook_topic = match.group(1) if match else None + async def __on_webhook_event(self, profile: Profile, event: Event): + match = EVENT_PATTERN_WEBHOOK.search(event.topic) + webhook_topic = match.group(1) if match else None + if webhook_topic: + await self.send_webhook(profile, webhook_topic, event.payload) + async def __on_record_event(self, profile: Profile, event: Event): + match = EVENT_PATTERN_RECORD.search(event.topic) + webhook_topic = match.group(1) if match else None if webhook_topic: await self.send_webhook(profile, webhook_topic, event.payload) diff --git a/aries_cloudagent/connections/models/conn_record.py b/aries_cloudagent/connections/models/conn_record.py index 06f68cffd4..ffff844108 100644 --- a/aries_cloudagent/connections/models/conn_record.py +++ b/aries_cloudagent/connections/models/conn_record.py @@ -141,7 +141,7 @@ def __eq__(self, other: Union[str, "ConnRecord.State"]) -> bool: return self is ConnRecord.State.get(other) RECORD_ID_NAME = "connection_id" - WEBHOOK_TOPIC = "connections" + RECORD_TOPIC = "connections" LOG_STATE_FLAG = "debug.connections" TAG_NAMES = {"my_did", "their_did", "request_id", "invitation_key"} diff --git a/aries_cloudagent/messaging/models/base_record.py b/aries_cloudagent/messaging/models/base_record.py index 9744739a4d..340ecc3ee0 100644 --- a/aries_cloudagent/messaging/models/base_record.py +++ b/aries_cloudagent/messaging/models/base_record.py @@ -5,7 +5,7 @@ import uuid from datetime import datetime -from typing import Any, Mapping, Sequence, Union +from typing import Any, Mapping, Optional, Sequence, Union from marshmallow import fields @@ -69,7 +69,7 @@ class Meta: DEFAULT_CACHE_TTL = 60 RECORD_ID_NAME = "id" RECORD_TYPE = None - WEBHOOK_TOPIC = None + RECORD_TOPIC: Optional[str] = None LOG_STATE_FLAG = None TAG_NAMES = {"state"} @@ -301,7 +301,7 @@ async def save( reason: str = None, log_params: Mapping[str, Any] = None, log_override: bool = False, - webhook: bool = None, + event: bool = None, ) -> str: """Persist the record to storage. @@ -309,7 +309,7 @@ async def save( session: The profile session to use reason: A reason to add to the log log_params: Additional parameters to log - webhook: Flag to override whether the webhook is sent + event: Flag to override whether the event is sent """ new_record = None log_reason = reason or ("Updated record" if self._id else "Created record") @@ -335,7 +335,7 @@ async def save( log_reason, params, override=log_override, settings=session.settings ) - await self.post_save(session, new_record, self._last_state, webhook) + await self.post_save(session, new_record, self._last_state, event) self._last_state = self.state return self._id @@ -344,8 +344,8 @@ async def post_save( self, session: ProfileSession, new_record: bool, - last_state: str, - webhook: bool = None, + last_state: Optional[str], + event: bool = None, ): """Perform post-save actions. @@ -353,15 +353,12 @@ async def post_save( session: The profile session to use new_record: Flag indicating if the record was just created last_state: The previous state value - webhook: Adjust whether the webhook is called + event: Flag to override whether the event is sent """ - webhook_topic = self.webhook_topic - if webhook is None: - webhook = bool(webhook_topic) and (new_record or (last_state != self.state)) - if webhook: - await self.send_webhook( - session, self.webhook_payload, topic=self.webhook_topic - ) + if event is None: + event = new_record or (last_state != self.state) + if event: + await self.emit_event(session, self.serialize()) async def delete_record(self, session: ProfileSession): """Remove the stored record. @@ -374,34 +371,20 @@ async def delete_record(self, session: ProfileSession): await storage.delete_record(self.storage_record) # FIXME - update state and send webhook? - @property - def webhook_payload(self): - """Return a JSON-serialized version of the record for the webhook.""" - return self.serialize() - - @property - def webhook_topic(self): - """Return the webhook topic value.""" - return self.WEBHOOK_TOPIC - - async def send_webhook( - self, session: ProfileSession, payload: Any, topic: str = None - ): - """Send a standard webhook. + async def emit_event(self, session: ProfileSession, payload: Any): + """Emit an event. Args: session: The profile session to use - payload: The webhook payload - topic: The webhook topic, defaulting to WEBHOOK_TOPIC + payload: The event payload """ - if not payload: + + if not self.RECORD_TOPIC or not self.state or not payload: return - if not topic: - topic = self.webhook_topic - if not topic: - return - await session.profile.notify("acapy::webhook::" + topic, payload) + await session.profile.notify( + f"acapy::record::{self.RECORD_TOPIC}::{self.state}", payload + ) @classmethod def log_state( diff --git a/aries_cloudagent/messaging/models/tests/test_base_record.py b/aries_cloudagent/messaging/models/tests/test_base_record.py index c75606a111..277df78133 100644 --- a/aries_cloudagent/messaging/models/tests/test_base_record.py +++ b/aries_cloudagent/messaging/models/tests/test_base_record.py @@ -86,7 +86,7 @@ async def test_post_save_new(self): with async_mock.patch.object( record, "post_save", async_mock.CoroutineMock() ) as post_save: - await record.save(session, reason="reason", webhook=True) + await record.save(session, reason="reason", event=True) post_save.assert_called_once_with(session, True, None, True) mock_storage.add_record.assert_called_once() @@ -102,7 +102,7 @@ async def test_post_save_exist(self): with async_mock.patch.object( record, "post_save", async_mock.CoroutineMock() ) as post_save: - await record.save(session, reason="reason", webhook=False) + await record.save(session, reason="reason", event=False) post_save.assert_called_once_with(session, False, last_state, False) mock_storage.update_record.assert_called_once() @@ -254,18 +254,21 @@ def test_skip_log(self, mock_print): record.log_state("state", settings=None) mock_print.assert_not_called() - async def test_webhook(self): + async def test_emit_event(self): session = InMemoryProfile.test_session() mock_event_bus = MockEventBus() session.profile.context.injector.bind_instance(EventBus, mock_event_bus) record = BaseRecordImpl() payload = {"test": "payload"} - topic = "topic" - await record.send_webhook(session, None, None) # cover short circuit - await record.send_webhook(session, "hello", None) # cover short circuit - await record.send_webhook(session, payload, topic=topic) + await record.emit_event(session, None) # cover short circuit + await record.emit_event(session, payload) # cover short circuit + record.RECORD_TOPIC = "topic" + await record.emit_event(session, payload) # cover short circuit + assert mock_event_bus.events == [] + record.state = "test_state" + await record.emit_event(session, payload) assert mock_event_bus.events == [ - (session.profile, Event("acapy::webhook::topic", payload)) + (session.profile, Event("acapy::record::topic::test_state", payload)) ] async def test_tag_prefix(self): diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py b/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py index 682f69238e..fc830bf534 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py @@ -22,7 +22,7 @@ class Meta: RECORD_TYPE = "credential_exchange_v10" RECORD_ID_NAME = "credential_exchange_id" - WEBHOOK_TOPIC = "issue_credential" + RECORD_TOPIC = "issue_credential" TAG_NAMES = {"~thread_id"} if unencrypted_tags else {"thread_id"} INITIATOR_SELF = "self" diff --git a/aries_cloudagent/protocols/issue_credential/v2_0/models/cred_ex_record.py b/aries_cloudagent/protocols/issue_credential/v2_0/models/cred_ex_record.py index 7dd8821d8c..a5aa96500d 100644 --- a/aries_cloudagent/protocols/issue_credential/v2_0/models/cred_ex_record.py +++ b/aries_cloudagent/protocols/issue_credential/v2_0/models/cred_ex_record.py @@ -21,7 +21,7 @@ class Meta: RECORD_TYPE = "cred_ex_v20" RECORD_ID_NAME = "cred_ex_id" - WEBHOOK_TOPIC = "issue_credential_v2_0" + RECORD_TOPIC = "issue_credential_v2_0" TAG_NAMES = {"~thread_id"} if UNENCRYPTED_TAGS else {"thread_id"} INITIATOR_SELF = "self" diff --git a/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/dif.py b/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/dif.py index 2e8a73c9c6..e303b30a20 100644 --- a/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/dif.py +++ b/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/dif.py @@ -22,7 +22,7 @@ class Meta: RECORD_ID_NAME = "cred_ex_dif_id" RECORD_TYPE = "dif_cred_ex_v20" TAG_NAMES = {"~cred_ex_id"} if UNENCRYPTED_TAGS else {"cred_ex_id"} - WEBHOOK_TOPIC = "issue_credential_v2_0_dif" + RECORD_TOPIC = "issue_credential_v2_0_dif" def __init__( self, diff --git a/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/indy.py b/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/indy.py index d47d9f76a5..8406716c74 100644 --- a/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/indy.py +++ b/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/indy.py @@ -22,7 +22,7 @@ class Meta: RECORD_ID_NAME = "cred_ex_indy_id" RECORD_TYPE = "indy_cred_ex_v20" TAG_NAMES = {"~cred_ex_id"} if UNENCRYPTED_TAGS else {"cred_ex_id"} - WEBHOOK_TOPIC = "issue_credential_v2_0_indy" + RECORD_TOPIC = "issue_credential_v2_0_indy" def __init__( self, diff --git a/aries_cloudagent/protocols/out_of_band/v1_0/models/invitation.py b/aries_cloudagent/protocols/out_of_band/v1_0/models/invitation.py index 32f6f5476e..26a2546e9f 100644 --- a/aries_cloudagent/protocols/out_of_band/v1_0/models/invitation.py +++ b/aries_cloudagent/protocols/out_of_band/v1_0/models/invitation.py @@ -19,7 +19,7 @@ class Meta: RECORD_TYPE = "oob_invitation" RECORD_ID_NAME = "invitation_id" - WEBHOOK_TOPIC = "oob_invitation" + RECORD_TOPIC = "oob_invitation" TAG_NAMES = {"invi_msg_id", "public_did"} STATE_INITIAL = "initial" diff --git a/aries_cloudagent/protocols/present_proof/v1_0/models/presentation_exchange.py b/aries_cloudagent/protocols/present_proof/v1_0/models/presentation_exchange.py index 1437b713d0..a969bf4bfa 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/models/presentation_exchange.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/models/presentation_exchange.py @@ -21,7 +21,7 @@ class Meta: RECORD_TYPE = "presentation_exchange_v10" RECORD_ID_NAME = "presentation_exchange_id" - WEBHOOK_TOPIC = "present_proof" + RECORD_TOPIC = "present_proof" TAG_NAMES = {"~thread_id"} if unencrypted_tags else {"thread_id"} INITIATOR_SELF = "self" diff --git a/aries_cloudagent/revocation/models/issuer_cred_rev_record.py b/aries_cloudagent/revocation/models/issuer_cred_rev_record.py index af43cfc491..cb87c070b9 100644 --- a/aries_cloudagent/revocation/models/issuer_cred_rev_record.py +++ b/aries_cloudagent/revocation/models/issuer_cred_rev_record.py @@ -24,7 +24,7 @@ class Meta: RECORD_TYPE = "issuer_cred_rev" RECORD_ID_NAME = "record_id" - WEBHOOK_TOPIC = "issuer_cred_rev" + RECORD_TOPIC = "issuer_cred_rev" TAG_NAMES = { "cred_ex_id", "cred_def_id", diff --git a/aries_cloudagent/revocation/models/issuer_rev_reg_record.py b/aries_cloudagent/revocation/models/issuer_rev_reg_record.py index 11be63d572..e3e0e5885c 100644 --- a/aries_cloudagent/revocation/models/issuer_rev_reg_record.py +++ b/aries_cloudagent/revocation/models/issuer_rev_reg_record.py @@ -47,7 +47,7 @@ class Meta: RECORD_ID_NAME = "record_id" RECORD_TYPE = "issuer_rev_reg" - WEBHOOK_TOPIC = "revocation_registry" + RECORD_TOPIC = "revocation_registry" LOG_STATE_FLAG = "debug.revocation" TAG_NAMES = { "cred_def_id",