Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/event bus #1063

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
24f0178
feat: add simple event bus
dbluhm Feb 4, 2021
8a762c6
feat: emit event on webhooks
dbluhm Feb 4, 2021
3d2a216
feat: log exceptions and continue processing events
dbluhm Feb 4, 2021
838c260
Merge branch 'main' into feature/event-bus
dbluhm Feb 11, 2021
d07a498
feat: use context instead of profile in event bus
dbluhm Feb 11, 2021
9e79bfd
feat: allow specifying reply_from_verkey in send
dbluhm Feb 19, 2021
3dbb81e
feat: accept profile only in event notify
dbluhm Feb 19, 2021
3d4ef09
feat: add sent_time to basicmessage webhook
dbluhm Feb 26, 2021
10b8134
fix: basicmessage handler tests
dbluhm Feb 26, 2021
ae32a16
AdminServer is now the only one to send_webhook
farskipper Mar 13, 2021
fad1502
adding back responder.send_webhook with a deprecation warning
farskipper Mar 25, 2021
36cdc8d
mapping events to webhooks
farskipper Mar 25, 2021
2dc86d7
base record replace webhooks with events
farskipper Apr 5, 2021
4218259
split up admin server event subscribers
farskipper Apr 5, 2021
99bcd4d
Merge pull request #59 from Indicio-tech/event-bus-webhook-refactor-b…
dbluhm Apr 5, 2021
fd29562
Merge pull request #58 from Indicio-tech/event-bus-webhook-refactor
dbluhm Apr 5, 2021
c8c31b0
Merge branch 'main' into feature/event-bus
farskipper Apr 5, 2021
3f9f11d
Merge branch 'main' into feature/event-bus
farskipper Apr 8, 2021
9470be5
Merge branch 'main' into feature/event-bus
farskipper Apr 8, 2021
cdde03f
fix: event bus forwarding to webhook
farskipper Apr 8, 2021
9413e09
Merge branch 'main' into feature/event-bus
dbluhm Apr 13, 2021
c3f0ba1
style: fix formatting with black
dbluhm Apr 13, 2021
d6a10c9
Merge branch 'main' into feature/event-bus
dbluhm Apr 13, 2021
b1e9a2c
Merge branch 'main' into feature/event-bus
dbluhm Apr 20, 2021
39f2c9a
Merge branch 'main' into feature/event-bus
dbluhm Apr 23, 2021
708f9d3
Merge branch 'main' into feature/event-bus
dbluhm Apr 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions aries_cloudagent/admin/base_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@


from abc import ABC, abstractmethod
from typing import Sequence

from ..core.profile import Profile


class BaseAdminServer(ABC):
Expand All @@ -23,20 +20,3 @@ async def start(self) -> None:
@abstractmethod
async def stop(self) -> None:
"""Stop the webserver."""

@abstractmethod
def add_webhook_target(
self,
target_url: str,
topic_filter: Sequence[str] = None,
max_attempts: int = None,
):
"""Add a webhook target."""

@abstractmethod
def remove_webhook_target(self, target_url: str):
"""Remove a webhook target."""

@abstractmethod
async def send_webhook(self, profile: Profile, topic: str, payload: dict):
"""Add a webhook to the queue, to send to all registered targets."""
121 changes: 52 additions & 69 deletions aries_cloudagent/admin/server.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,57 @@
"""Admin server classes."""

import asyncio
from hmac import compare_digest
import logging
import re
from typing import Callable, Coroutine
import uuid
import warnings

from typing import Callable, Coroutine, Sequence, Set

import aiohttp_cors
import jwt

from hmac import compare_digest
from aiohttp import web
from aiohttp_apispec import (
docs,
response_schema,
setup_aiohttp_apispec,
validation_middleware,
)

import aiohttp_cors
import jwt
from marshmallow import fields

from ..config.injection_context import InjectionContext
from ..core.profile import Profile
from ..core.event_bus import Event, EventBus
from ..core.plugin_registry import PluginRegistry
from ..core.profile import Profile
from ..ledger.error import LedgerConfigError, LedgerTransactionError
from ..messaging.models.openapi import OpenAPISchema
from ..messaging.responder import BaseResponder
from ..transport.queue.basic import BasicMessageQueue
from ..multitenant.manager import MultitenantManager, MultitenantManagerError
from ..storage.error import StorageNotFoundError
from ..transport.outbound.message import OutboundMessage
from ..transport.queue.basic import BasicMessageQueue
from ..utils.stats import Collector
from ..utils.task_queue import TaskQueue
from ..version import __version__
from ..multitenant.manager import MultitenantManager, MultitenantManagerError

from ..storage.error import StorageNotFoundError
from .base_server import BaseAdminServer
from .error import AdminSetupError
from .request_context import AdminRequestContext


LOGGER = logging.getLogger(__name__)

EVENT_PATTERN_WEBHOOK = re.compile("^acapy::webhook::(.*)$")
EVENT_PATTERN_RECORD = re.compile("^acapy::record::(.*)::(.*)$")

EVENT_WEBHOOK_MAPPING = {
"acapy::basicmessage::received": "basicmessages",
"acapy::problem_report": "problem_report",
"acapy::ping::received": "ping",
"acapy::ping::response_received": "ping",
"acapy::actionmenu::received": "actionmenu",
"acapy::actionmenu::get-active-menu": "get-active-menu",
"acapy::actionmenu::perform-menu-action": "perform-menu-action",
}


class AdminModulesSchema(OpenAPISchema):
"""Schema for the modules endpoint."""
Expand Down Expand Up @@ -93,7 +103,6 @@ def __init__(
self,
profile: Profile,
send: Coroutine,
webhook: Coroutine,
**kwargs,
):
"""
Expand All @@ -106,7 +115,6 @@ def __init__(
super().__init__(**kwargs)
self._profile = profile
self._send = send
self._webhook = webhook

async def send_outbound(self, message: OutboundMessage):
"""
Expand All @@ -119,53 +127,23 @@ async def send_outbound(self, message: OutboundMessage):

async def send_webhook(self, topic: str, payload: dict):
"""
Dispatch a webhook.
Dispatch a webhook. DEPRECATED: use the event bus instead.

Args:
topic: the webhook topic identifier
payload: the webhook payload value
"""
await self._webhook(self._profile, topic, payload)
warnings.warn(
"responder.send_webhook is deprecated; please use the event bus instead.",
DeprecationWarning,
)
await self._profile.notify("acapy::webhook::" + topic, payload)

@property
def send_fn(self) -> Coroutine:
"""Accessor for async function to send outbound message."""
return self._send

@property
def webhook_fn(self) -> Coroutine:
"""Accessor for the async function to dispatch a webhook."""
return self._webhook


class WebhookTarget:
"""Class for managing webhook target information."""

def __init__(
self,
endpoint: str,
topic_filter: Sequence[str] = None,
max_attempts: int = None,
):
"""Initialize the webhook target."""
self.endpoint = endpoint
self.max_attempts = max_attempts
self._topic_filter = None
self.topic_filter = topic_filter # call setter

@property
def topic_filter(self) -> Set[str]:
"""Accessor for the target's topic filter."""
return self._topic_filter

@topic_filter.setter
def topic_filter(self, val: Sequence[str]):
"""Setter for the target's topic filter."""
filt = set(val) if val else None
if filt and "*" in filt:
filt = None
self._topic_filter = filt


@web.middleware
async def ready_middleware(request: web.BaseRequest, handler: Coroutine):
Expand Down Expand Up @@ -270,7 +248,6 @@ def __init__(
self.root_profile = root_profile
self.task_queue = task_queue
self.webhook_router = webhook_router
self.webhook_targets = {}
self.websocket_queues = {}
self.site = None
self.multitenant_manager = context.inject(MultitenantManager, required=False)
Expand Down Expand Up @@ -371,7 +348,6 @@ async def setup_context(request: web.Request, handler):
responder = AdminResponder(
profile,
self.outbound_message_router,
self.send_webhook,
)
profile.context.injector.bind_instance(BaseResponder, responder)

Expand Down Expand Up @@ -472,6 +448,19 @@ def sort_dict(raw: dict) -> dict:
if plugin_registry:
plugin_registry.post_process_routes(self.app)

event_bus = self.context.inject(EventBus, required=False)
if event_bus:
event_bus.subscribe(EVENT_PATTERN_WEBHOOK, self.__on_webhook_event)
event_bus.subscribe(EVENT_PATTERN_RECORD, self.__on_record_event)

for event_topic, webhook_topic in EVENT_WEBHOOK_MAPPING.items():
event_bus.subscribe(
re.compile(re.escape(event_topic)),
lambda profile, event, webhook_topic=webhook_topic: self.send_webhook(
profile, webhook_topic, event.payload
),
)

# order tags alphabetically, parameters deterministically and pythonically
swagger_dict = self.app._state["swagger_dict"]
swagger_dict.get("tags", []).sort(key=lambda t: t["name"])
Expand Down Expand Up @@ -799,21 +788,17 @@ async def websocket_handler(self, request):

return ws

def add_webhook_target(
self,
target_url: str,
topic_filter: Sequence[str] = None,
max_attempts: int = None,
):
"""Add a webhook target."""
self.webhook_targets[target_url] = WebhookTarget(
target_url, topic_filter, max_attempts
)
async def __on_webhook_event(self, profile: Profile, event: Event):
match = EVENT_PATTERN_WEBHOOK.search(event.topic)
webhook_topic = match.group(1) if match else None
if webhook_topic:
await self.send_webhook(profile, webhook_topic, event.payload)

def remove_webhook_target(self, target_url: str):
"""Remove a webhook target."""
if target_url in self.webhook_targets:
del self.webhook_targets[target_url]
async def __on_record_event(self, profile: Profile, event: Event):
match = EVENT_PATTERN_RECORD.search(event.topic)
webhook_topic = match.group(1) if match else None
if webhook_topic:
await self.send_webhook(profile, webhook_topic, event.payload)

async def send_webhook(self, profile: Profile, topic: str, payload: dict):
"""Add a webhook to the queue, to send to all registered targets."""
Expand All @@ -825,8 +810,6 @@ async def send_webhook(self, profile: Profile, topic: str, payload: dict):
metadata = {"x-wallet-id": wallet_id}

if self.webhook_router:
# for idx, target in self.webhook_targets.items():
# if not target.topic_filter or topic in target.topic_filter:
for endpoint in webhook_urls:
self.webhook_router(
topic,
Expand Down
33 changes: 0 additions & 33 deletions aries_cloudagent/admin/tests/test_admin_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,6 @@
from ..server import AdminServer, AdminSetupError


class TestAdminResponder(AsyncTestCase):
async def test_admin_responder(self):
admin_responder = test_module.AdminResponder(
None, async_mock.CoroutineMock(), async_mock.CoroutineMock()
)

assert admin_responder.send_fn is admin_responder._send
assert admin_responder.webhook_fn is admin_responder._webhook

message = test_module.OutboundMessage(payload="hello")
await admin_responder.send_outbound(message)
assert admin_responder._send.called_once_with(None, message)

await admin_responder.send_webhook("topic", {"payload": "hello"})
assert admin_responder._webhook.called_once_with("topic", {"outbound": "hello"})


class TestWebhookTarget(AsyncTestCase):
async def test_webhook_target(self):
webhook_target = test_module.WebhookTarget(
endpoint="localhost:8888",
topic_filter=["birthdays", "animal videos"],
max_attempts=None,
)
assert webhook_target.topic_filter == {"birthdays", "animal videos"}

webhook_target.topic_filter = []
assert webhook_target.topic_filter is None

webhook_target.topic_filter = ["duct cleaning", "*"]
assert webhook_target.topic_filter is None


class TestAdminServer(AsyncTestCase):
async def setUp(self):
self.message_results = []
Expand Down
4 changes: 4 additions & 0 deletions aries_cloudagent/config/default_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from ..cache.base import BaseCache
from ..cache.in_memory import InMemoryCache
from ..core.event_bus import EventBus
from ..core.plugin_registry import PluginRegistry
from ..core.profile import ProfileManager, ProfileManagerProvider
from ..core.protocol_registry import ProtocolRegistry
Expand Down Expand Up @@ -42,6 +43,9 @@ async def build_context(self) -> InjectionContext:
# Global protocol registry
context.injector.bind_instance(ProtocolRegistry, ProtocolRegistry())

# Global event bus
context.injector.bind_instance(EventBus, EventBus())

# Global did resolver registry
did_resolver_registry = DIDResolverRegistry()
context.injector.bind_instance(DIDResolverRegistry, did_resolver_registry)
Expand Down
2 changes: 1 addition & 1 deletion aries_cloudagent/connections/models/conn_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def __eq__(self, other: Union[str, "ConnRecord.State"]) -> bool:
return self is ConnRecord.State.get(other)

RECORD_ID_NAME = "connection_id"
WEBHOOK_TOPIC = "connections"
RECORD_TOPIC = "connections"
LOG_STATE_FLAG = "debug.connections"
TAG_NAMES = {"my_did", "their_did", "request_id", "invitation_key"}

Expand Down
6 changes: 0 additions & 6 deletions aries_cloudagent/core/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,6 @@ async def setup(self):
self.dispatcher.task_queue,
self.get_stats,
)
webhook_urls = context.settings.get("admin.webhook_urls")
if webhook_urls:
for url in webhook_urls:
self.admin_server.add_webhook_target(url)
context.injector.bind_instance(BaseAdminServer, self.admin_server)
except Exception:
LOGGER.exception("Unable to register admin server")
Expand Down Expand Up @@ -206,7 +202,6 @@ async def start(self) -> None:
responder = AdminResponder(
self.root_profile,
self.admin_server.outbound_message_router,
self.admin_server.send_webhook,
)
context.injector.bind_instance(BaseResponder, responder)

Expand Down Expand Up @@ -398,7 +393,6 @@ def inbound_message_router(
profile,
message,
self.outbound_message_router,
self.admin_server and self.admin_server.send_webhook,
lambda completed: self.dispatch_complete(message, completed),
)
except (LedgerConfigError, LedgerTransactionError) as e:
Expand Down
Loading