Skip to content

Commit

Permalink
Merge pull request #59 from Indicio-tech/event-bus-webhook-refactor-b…
Browse files Browse the repository at this point in the history
…ase_record

Base record replace webhooks with events
  • Loading branch information
dbluhm authored Apr 5, 2021
2 parents 36cdc8d + 4218259 commit 99bcd4d
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 61 deletions.
26 changes: 19 additions & 7 deletions aries_cloudagent/admin/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@

LOGGER = logging.getLogger(__name__)

EVENT_PATTERN_ACAPY = re.compile("^acapy::(.*)$")
EVENT_PATTERN_WEBHOOK = re.compile("^acapy::webhook::(.*)$")
EVENT_PATTERN_RECORD = re.compile("^acapy::record::(.*)::(.*)$")

EVENT_WEBHOOK_MAPPING = {
"acapy::basicmessage::received": "basicmessages",
Expand Down Expand Up @@ -416,7 +416,16 @@ def sort_dict(raw: dict) -> dict:

event_bus = self.context.inject(EventBus, required=False)
if event_bus:
event_bus.subscribe(EVENT_PATTERN_ACAPY, self.__on_acapy_event)
event_bus.subscribe(EVENT_PATTERN_WEBHOOK, self.__on_webhook_event)
event_bus.subscribe(EVENT_PATTERN_RECORD, self.__on_record_event)

for event_topic, webhook_topic in EVENT_WEBHOOK_MAPPING.items():
event_bus.subscribe(
re.compile(re.escape(event_topic)),
lambda profile, event: self.send_webhook(
profile, webhook_topic, event.payload
),
)

# order tags alphabetically, parameters deterministically and pythonically
swagger_dict = self.app._state["swagger_dict"]
Expand Down Expand Up @@ -706,12 +715,15 @@ async def websocket_handler(self, request):

return ws

async def __on_acapy_event(self, profile: Profile, event: Event):
webhook_topic = EVENT_WEBHOOK_MAPPING.get(event.topic)
if not webhook_topic:
match = EVENT_PATTERN_WEBHOOK.search(event.topic)
webhook_topic = match.group(1) if match else None
async def __on_webhook_event(self, profile: Profile, event: Event):
match = EVENT_PATTERN_WEBHOOK.search(event.topic)
webhook_topic = match.group(1) if match else None
if webhook_topic:
await self.send_webhook(profile, webhook_topic, event.payload)

async def __on_record_event(self, profile: Profile, event: Event):
match = EVENT_PATTERN_RECORD.search(event.topic)
webhook_topic = match.group(1) if match else None
if webhook_topic:
await self.send_webhook(profile, webhook_topic, event.payload)

Expand Down
2 changes: 1 addition & 1 deletion aries_cloudagent/connections/models/conn_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def __eq__(self, other: Union[str, "ConnRecord.State"]) -> bool:
return self is ConnRecord.State.get(other)

RECORD_ID_NAME = "connection_id"
WEBHOOK_TOPIC = "connections"
RECORD_TOPIC = "connections"
LOG_STATE_FLAG = "debug.connections"
TAG_NAMES = {"my_did", "their_did", "request_id", "invitation_key"}

Expand Down
57 changes: 20 additions & 37 deletions aries_cloudagent/messaging/models/base_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import uuid

from datetime import datetime
from typing import Any, Mapping, Sequence, Union
from typing import Any, Mapping, Optional, Sequence, Union

from marshmallow import fields

Expand Down Expand Up @@ -69,7 +69,7 @@ class Meta:
DEFAULT_CACHE_TTL = 60
RECORD_ID_NAME = "id"
RECORD_TYPE = None
WEBHOOK_TOPIC = None
RECORD_TOPIC: Optional[str] = None
LOG_STATE_FLAG = None
TAG_NAMES = {"state"}

Expand Down Expand Up @@ -301,15 +301,15 @@ async def save(
reason: str = None,
log_params: Mapping[str, Any] = None,
log_override: bool = False,
webhook: bool = None,
event: bool = None,
) -> str:
"""Persist the record to storage.
Args:
session: The profile session to use
reason: A reason to add to the log
log_params: Additional parameters to log
webhook: Flag to override whether the webhook is sent
event: Flag to override whether the event is sent
"""
new_record = None
log_reason = reason or ("Updated record" if self._id else "Created record")
Expand All @@ -335,7 +335,7 @@ async def save(
log_reason, params, override=log_override, settings=session.settings
)

await self.post_save(session, new_record, self._last_state, webhook)
await self.post_save(session, new_record, self._last_state, event)
self._last_state = self.state

return self._id
Expand All @@ -344,24 +344,21 @@ async def post_save(
self,
session: ProfileSession,
new_record: bool,
last_state: str,
webhook: bool = None,
last_state: Optional[str],
event: bool = None,
):
"""Perform post-save actions.
Args:
session: The profile session to use
new_record: Flag indicating if the record was just created
last_state: The previous state value
webhook: Adjust whether the webhook is called
event: Flag to override whether the event is sent
"""
webhook_topic = self.webhook_topic
if webhook is None:
webhook = bool(webhook_topic) and (new_record or (last_state != self.state))
if webhook:
await self.send_webhook(
session, self.webhook_payload, topic=self.webhook_topic
)
if event is None:
event = new_record or (last_state != self.state)
if event:
await self.emit_event(session, self.serialize())

async def delete_record(self, session: ProfileSession):
"""Remove the stored record.
Expand All @@ -374,34 +371,20 @@ async def delete_record(self, session: ProfileSession):
await storage.delete_record(self.storage_record)
# FIXME - update state and send webhook?

@property
def webhook_payload(self):
"""Return a JSON-serialized version of the record for the webhook."""
return self.serialize()

@property
def webhook_topic(self):
"""Return the webhook topic value."""
return self.WEBHOOK_TOPIC

async def send_webhook(
self, session: ProfileSession, payload: Any, topic: str = None
):
"""Send a standard webhook.
async def emit_event(self, session: ProfileSession, payload: Any):
"""Emit an event.
Args:
session: The profile session to use
payload: The webhook payload
topic: The webhook topic, defaulting to WEBHOOK_TOPIC
payload: The event payload
"""
if not payload:

if not self.RECORD_TOPIC or not self.state or not payload:
return
if not topic:
topic = self.webhook_topic
if not topic:
return

await session.profile.notify("acapy::webhook::" + topic, payload)
await session.profile.notify(
f"acapy::record::{self.RECORD_TOPIC}::{self.state}", payload
)

@classmethod
def log_state(
Expand Down
19 changes: 11 additions & 8 deletions aries_cloudagent/messaging/models/tests/test_base_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async def test_post_save_new(self):
with async_mock.patch.object(
record, "post_save", async_mock.CoroutineMock()
) as post_save:
await record.save(session, reason="reason", webhook=True)
await record.save(session, reason="reason", event=True)
post_save.assert_called_once_with(session, True, None, True)
mock_storage.add_record.assert_called_once()

Expand All @@ -102,7 +102,7 @@ async def test_post_save_exist(self):
with async_mock.patch.object(
record, "post_save", async_mock.CoroutineMock()
) as post_save:
await record.save(session, reason="reason", webhook=False)
await record.save(session, reason="reason", event=False)
post_save.assert_called_once_with(session, False, last_state, False)
mock_storage.update_record.assert_called_once()

Expand Down Expand Up @@ -254,18 +254,21 @@ def test_skip_log(self, mock_print):
record.log_state("state", settings=None)
mock_print.assert_not_called()

async def test_webhook(self):
async def test_emit_event(self):
session = InMemoryProfile.test_session()
mock_event_bus = MockEventBus()
session.profile.context.injector.bind_instance(EventBus, mock_event_bus)
record = BaseRecordImpl()
payload = {"test": "payload"}
topic = "topic"
await record.send_webhook(session, None, None) # cover short circuit
await record.send_webhook(session, "hello", None) # cover short circuit
await record.send_webhook(session, payload, topic=topic)
await record.emit_event(session, None) # cover short circuit
await record.emit_event(session, payload) # cover short circuit
record.RECORD_TOPIC = "topic"
await record.emit_event(session, payload) # cover short circuit
assert mock_event_bus.events == []
record.state = "test_state"
await record.emit_event(session, payload)
assert mock_event_bus.events == [
(session.profile, Event("acapy::webhook::topic", payload))
(session.profile, Event("acapy::record::topic::test_state", payload))
]

async def test_tag_prefix(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Meta:

RECORD_TYPE = "credential_exchange_v10"
RECORD_ID_NAME = "credential_exchange_id"
WEBHOOK_TOPIC = "issue_credential"
RECORD_TOPIC = "issue_credential"
TAG_NAMES = {"~thread_id"} if unencrypted_tags else {"thread_id"}

INITIATOR_SELF = "self"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Meta:

RECORD_TYPE = "cred_ex_v20"
RECORD_ID_NAME = "cred_ex_id"
WEBHOOK_TOPIC = "issue_credential_v2_0"
RECORD_TOPIC = "issue_credential_v2_0"
TAG_NAMES = {"~thread_id"} if UNENCRYPTED_TAGS else {"thread_id"}

INITIATOR_SELF = "self"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Meta:
RECORD_ID_NAME = "cred_ex_dif_id"
RECORD_TYPE = "dif_cred_ex_v20"
TAG_NAMES = {"~cred_ex_id"} if UNENCRYPTED_TAGS else {"cred_ex_id"}
WEBHOOK_TOPIC = "issue_credential_v2_0_dif"
RECORD_TOPIC = "issue_credential_v2_0_dif"

def __init__(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Meta:
RECORD_ID_NAME = "cred_ex_indy_id"
RECORD_TYPE = "indy_cred_ex_v20"
TAG_NAMES = {"~cred_ex_id"} if UNENCRYPTED_TAGS else {"cred_ex_id"}
WEBHOOK_TOPIC = "issue_credential_v2_0_indy"
RECORD_TOPIC = "issue_credential_v2_0_indy"

def __init__(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Meta:

RECORD_TYPE = "oob_invitation"
RECORD_ID_NAME = "invitation_id"
WEBHOOK_TOPIC = "oob_invitation"
RECORD_TOPIC = "oob_invitation"
TAG_NAMES = {"invi_msg_id", "public_did"}

STATE_INITIAL = "initial"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Meta:

RECORD_TYPE = "presentation_exchange_v10"
RECORD_ID_NAME = "presentation_exchange_id"
WEBHOOK_TOPIC = "present_proof"
RECORD_TOPIC = "present_proof"
TAG_NAMES = {"~thread_id"} if unencrypted_tags else {"thread_id"}

INITIATOR_SELF = "self"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Meta:

RECORD_TYPE = "issuer_cred_rev"
RECORD_ID_NAME = "record_id"
WEBHOOK_TOPIC = "issuer_cred_rev"
RECORD_TOPIC = "issuer_cred_rev"
TAG_NAMES = {
"cred_ex_id",
"cred_def_id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Meta:

RECORD_ID_NAME = "record_id"
RECORD_TYPE = "issuer_rev_reg"
WEBHOOK_TOPIC = "revocation_registry"
RECORD_TOPIC = "revocation_registry"
LOG_STATE_FLAG = "debug.revocation"
TAG_NAMES = {
"cred_def_id",
Expand Down

0 comments on commit 99bcd4d

Please sign in to comment.