Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into cwu-bbs-main
Browse files Browse the repository at this point in the history
  • Loading branch information
TimoGlastra committed Apr 28, 2021
2 parents 7cf00bc + 0b3de3f commit c6a1626
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 17 deletions.
12 changes: 6 additions & 6 deletions aries_cloudagent/admin/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
LOGGER = logging.getLogger(__name__)

EVENT_PATTERN_WEBHOOK = re.compile("^acapy::webhook::(.*)$")
EVENT_PATTERN_RECORD = re.compile("^acapy::record::(.*)::(.*)$")
EVENT_PATTERN_RECORD = re.compile("^acapy::record::([^:]*)(?:::.*)?$")

EVENT_WEBHOOK_MAPPING = {
"acapy::basicmessage::received": "basicmessages",
Expand Down Expand Up @@ -450,8 +450,8 @@ def sort_dict(raw: dict) -> dict:

event_bus = self.context.inject(EventBus, required=False)
if event_bus:
event_bus.subscribe(EVENT_PATTERN_WEBHOOK, self.__on_webhook_event)
event_bus.subscribe(EVENT_PATTERN_RECORD, self.__on_record_event)
event_bus.subscribe(EVENT_PATTERN_WEBHOOK, self._on_webhook_event)
event_bus.subscribe(EVENT_PATTERN_RECORD, self._on_record_event)

for event_topic, webhook_topic in EVENT_WEBHOOK_MAPPING.items():
event_bus.subscribe(
Expand Down Expand Up @@ -788,19 +788,19 @@ async def websocket_handler(self, request):

return ws

async def __on_webhook_event(self, profile: Profile, event: Event):
async def _on_webhook_event(self, profile: Profile, event: Event):
match = EVENT_PATTERN_WEBHOOK.search(event.topic)
webhook_topic = match.group(1) if match else None
if webhook_topic:
await self.send_webhook(profile, webhook_topic, event.payload)

async def __on_record_event(self, profile: Profile, event: Event):
async def _on_record_event(self, profile: Profile, event: Event):
match = EVENT_PATTERN_RECORD.search(event.topic)
webhook_topic = match.group(1) if match else None
if webhook_topic:
await self.send_webhook(profile, webhook_topic, event.payload)

async def send_webhook(self, profile: Profile, topic: str, payload: dict):
async def send_webhook(self, profile: Profile, topic: str, payload: dict = None):
"""Add a webhook to the queue, to send to all registered targets."""
wallet_id = profile.settings.get("wallet.id")
webhook_urls = profile.settings.get("admin.webhook_urls")
Expand Down
27 changes: 25 additions & 2 deletions aries_cloudagent/admin/tests/test_admin_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@

from aiohttp import ClientSession, DummyCookieJar, TCPConnector, web
from aiohttp.test_utils import unused_port
import pytest

from asynctest import TestCase as AsyncTestCase
from asynctest import mock as async_mock

from .. import server as test_module
from ...config.default_context import DefaultContextBuilder
from ...config.injection_context import InjectionContext
from ...core.event_bus import Event
from ...core.in_memory import InMemoryProfile
from ...core.protocol_registry import ProtocolRegistry
from ...transport.outbound.message import OutboundMessage
from ...utils.stats import Collector
from ...utils.task_queue import TaskQueue

from .. import server as test_module
from ..server import AdminServer, AdminSetupError


Expand Down Expand Up @@ -434,3 +435,25 @@ async def test_server_health_state(self):
) as response:
assert response.status == 503
await server.stop()


@pytest.fixture
async def server():
test_class = TestAdminServer()
await test_class.setUp()
yield test_class.get_admin_server()
await test_class.tearDown()


@pytest.mark.asyncio
@pytest.mark.parametrize(
"event_topic, webhook_topic",
[("acapy::record::topic", "topic"), ("acapy::record::topic::state", "topic")],
)
async def test_on_record_event(server, event_topic, webhook_topic):
profile = InMemoryProfile.test_profile()
with async_mock.patch.object(
server, "send_webhook", async_mock.CoroutineMock()
) as mock_send_webhook:
await server._on_record_event(profile, Event(event_topic, None))
mock_send_webhook.assert_called_once_with(profile, webhook_topic, None)
17 changes: 12 additions & 5 deletions aries_cloudagent/messaging/models/base_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Meta:
RECORD_ID_NAME = "id"
RECORD_TYPE = None
RECORD_TOPIC: Optional[str] = None
EVENT_NAMESPACE: str = "acapy::record"
LOG_STATE_FLAG = None
TAG_NAMES = {"state"}

Expand Down Expand Up @@ -371,20 +372,26 @@ async def delete_record(self, session: ProfileSession):
await storage.delete_record(self.storage_record)
# FIXME - update state and send webhook?

async def emit_event(self, session: ProfileSession, payload: Any):
async def emit_event(self, session: ProfileSession, payload: Any = None):
"""Emit an event.
Args:
session: The profile session to use
payload: The event payload
"""

if not self.RECORD_TOPIC or not self.state or not payload:
if not self.RECORD_TOPIC:
return

await session.profile.notify(
f"acapy::record::{self.RECORD_TOPIC}::{self.state}", payload
)
if self.state:
topic = f"{self.EVENT_NAMESPACE}::{self.RECORD_TOPIC}::{self.state}"
else:
topic = f"{self.EVENT_NAMESPACE}::{self.RECORD_TOPIC}"

if not payload:
payload = self.serialize()

await session.profile.notify(topic, payload)

@classmethod
def log_state(
Expand Down
26 changes: 22 additions & 4 deletions aries_cloudagent/messaging/models/tests/test_base_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,29 @@ async def test_emit_event(self):
session.profile.context.injector.bind_instance(EventBus, mock_event_bus)
record = BaseRecordImpl()
payload = {"test": "payload"}
await record.emit_event(session, None) # cover short circuit
await record.emit_event(session, payload) # cover short circuit
record.RECORD_TOPIC = "topic"
await record.emit_event(session, payload) # cover short circuit

# Records must have topic to emit events
record.RECORD_TOPIC = None
await record.emit_event(session, payload)
assert mock_event_bus.events == []

record.RECORD_TOPIC = "topic"

# Stateless record with no payload emits event with serialized record
await record.emit_event(session)
assert mock_event_bus.events == [
(session.profile, Event("acapy::record::topic", {}))
]
mock_event_bus.events.clear()

# Stateless record with payload emits event
await record.emit_event(session, payload)
assert mock_event_bus.events == [
(session.profile, Event("acapy::record::topic", payload))
]
mock_event_bus.events.clear()

# Statefull record with payload emits event
record.state = "test_state"
await record.emit_event(session, payload)
assert mock_event_bus.events == [
Expand Down

0 comments on commit c6a1626

Please sign in to comment.