Skip to content

Commit

Permalink
Merge pull request openwallet-foundation#58 from Indicio-tech/event-b…
Browse files Browse the repository at this point in the history
…us-webhook-refactor

AdminServer is now the only one to send_webhook
  • Loading branch information
dbluhm authored Apr 5, 2021
2 parents 10b8134 + 99bcd4d commit fd29562
Show file tree
Hide file tree
Showing 33 changed files with 228 additions and 263 deletions.
20 changes: 0 additions & 20 deletions aries_cloudagent/admin/base_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@


from abc import ABC, abstractmethod
from typing import Sequence

from ..core.profile import Profile


class BaseAdminServer(ABC):
Expand All @@ -23,20 +20,3 @@ async def start(self) -> None:
@abstractmethod
async def stop(self) -> None:
"""Stop the webserver."""

@abstractmethod
def add_webhook_target(
self,
target_url: str,
topic_filter: Sequence[str] = None,
max_attempts: int = None,
):
"""Add a webhook target."""

@abstractmethod
def remove_webhook_target(self, target_url: str):
"""Remove a webhook target."""

@abstractmethod
async def send_webhook(self, profile: Profile, topic: str, payload: dict):
"""Add a webhook to the queue, to send to all registered targets."""
99 changes: 45 additions & 54 deletions aries_cloudagent/admin/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import asyncio
import logging
from typing import Callable, Coroutine, Sequence, Set
import re
from typing import Callable, Coroutine
import uuid
import warnings

from aiohttp import web
from aiohttp_apispec import (
Expand Down Expand Up @@ -39,6 +41,19 @@

LOGGER = logging.getLogger(__name__)

EVENT_PATTERN_WEBHOOK = re.compile("^acapy::webhook::(.*)$")
EVENT_PATTERN_RECORD = re.compile("^acapy::record::(.*)::(.*)$")

EVENT_WEBHOOK_MAPPING = {
"acapy::basicmessage::received": "basicmessages",
"acapy::problem_report": "problem_report",
"acapy::ping::received": "ping",
"acapy::ping::response_received": "ping",
"acapy::actionmenu::received": "actionmenu",
"acapy::actionmenu::get-active-menu": "get-active-menu",
"acapy::actionmenu::perform-menu-action": "perform-menu-action",
}


class AdminModulesSchema(OpenAPISchema):
"""Schema for the modules endpoint."""
Expand Down Expand Up @@ -75,7 +90,6 @@ def __init__(
self,
profile: Profile,
send: Coroutine,
webhook: Coroutine,
**kwargs,
):
"""
Expand All @@ -88,7 +102,6 @@ def __init__(
super().__init__(**kwargs)
self._profile = profile
self._send = send
self._webhook = webhook

async def send_outbound(self, message: OutboundMessage):
"""
Expand All @@ -101,44 +114,17 @@ async def send_outbound(self, message: OutboundMessage):

async def send_webhook(self, topic: str, payload: dict):
"""
Dispatch a webhook.
Dispatch a webhook. DEPRECATED: use the event bus instead.
Args:
topic: the webhook topic identifier
payload: the webhook payload value
"""
event_bus = self._profile.inject(EventBus)
await event_bus.notify(self._profile, Event(topic, payload))
await self._webhook(self._profile, topic, payload)


class WebhookTarget:
"""Class for managing webhook target information."""

def __init__(
self,
endpoint: str,
topic_filter: Sequence[str] = None,
max_attempts: int = None,
):
"""Initialize the webhook target."""
self.endpoint = endpoint
self.max_attempts = max_attempts
self._topic_filter = None
self.topic_filter = topic_filter # call setter

@property
def topic_filter(self) -> Set[str]:
"""Accessor for the target's topic filter."""
return self._topic_filter

@topic_filter.setter
def topic_filter(self, val: Sequence[str]):
"""Setter for the target's topic filter."""
filter = set(val) if val else None
if filter and "*" in filter:
filter = None
self._topic_filter = filter
warnings.warn(
"responder.send_webhook is deprecated; please use the event bus instead.",
DeprecationWarning,
)
await self._profile.notify("acapy::webhook::" + topic, payload)


@web.middleware
Expand Down Expand Up @@ -236,7 +222,6 @@ def __init__(
self.root_profile = root_profile
self.task_queue = task_queue
self.webhook_router = webhook_router
self.webhook_targets = {}
self.websocket_queues = {}
self.site = None
self.multitenant_manager = context.inject(MultitenantManager, required=False)
Expand Down Expand Up @@ -337,7 +322,6 @@ async def setup_context(request: web.Request, handler):
responder = AdminResponder(
profile,
self.outbound_message_router,
self.send_webhook,
)
profile.context.injector.bind_instance(BaseResponder, responder)

Expand Down Expand Up @@ -430,6 +414,19 @@ def sort_dict(raw: dict) -> dict:
if plugin_registry:
plugin_registry.post_process_routes(self.app)

event_bus = self.context.inject(EventBus, required=False)
if event_bus:
event_bus.subscribe(EVENT_PATTERN_WEBHOOK, self.__on_webhook_event)
event_bus.subscribe(EVENT_PATTERN_RECORD, self.__on_record_event)

for event_topic, webhook_topic in EVENT_WEBHOOK_MAPPING.items():
event_bus.subscribe(
re.compile(re.escape(event_topic)),
lambda profile, event: self.send_webhook(
profile, webhook_topic, event.payload
),
)

# order tags alphabetically, parameters deterministically and pythonically
swagger_dict = self.app._state["swagger_dict"]
swagger_dict.get("tags", []).sort(key=lambda t: t["name"])
Expand Down Expand Up @@ -718,21 +715,17 @@ async def websocket_handler(self, request):

return ws

def add_webhook_target(
self,
target_url: str,
topic_filter: Sequence[str] = None,
max_attempts: int = None,
):
"""Add a webhook target."""
self.webhook_targets[target_url] = WebhookTarget(
target_url, topic_filter, max_attempts
)
async def __on_webhook_event(self, profile: Profile, event: Event):
match = EVENT_PATTERN_WEBHOOK.search(event.topic)
webhook_topic = match.group(1) if match else None
if webhook_topic:
await self.send_webhook(profile, webhook_topic, event.payload)

def remove_webhook_target(self, target_url: str):
"""Remove a webhook target."""
if target_url in self.webhook_targets:
del self.webhook_targets[target_url]
async def __on_record_event(self, profile: Profile, event: Event):
match = EVENT_PATTERN_RECORD.search(event.topic)
webhook_topic = match.group(1) if match else None
if webhook_topic:
await self.send_webhook(profile, webhook_topic, event.payload)

async def send_webhook(self, profile: Profile, topic: str, payload: dict):
"""Add a webhook to the queue, to send to all registered targets."""
Expand All @@ -744,8 +737,6 @@ async def send_webhook(self, profile: Profile, topic: str, payload: dict):
metadata = {"x-wallet-id": wallet_id}

if self.webhook_router:
# for idx, target in self.webhook_targets.items():
# if not target.topic_filter or topic in target.topic_filter:
for endpoint in webhook_urls:
self.webhook_router(
topic,
Expand Down
2 changes: 1 addition & 1 deletion aries_cloudagent/connections/models/conn_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def __eq__(self, other: Union[str, "ConnRecord.State"]) -> bool:
return self is ConnRecord.State.get(other)

RECORD_ID_NAME = "connection_id"
WEBHOOK_TOPIC = "connections"
RECORD_TOPIC = "connections"
LOG_STATE_FLAG = "debug.connections"
TAG_NAMES = {"my_did", "their_did", "request_id", "invitation_key"}

Expand Down
6 changes: 0 additions & 6 deletions aries_cloudagent/core/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ async def setup(self):
self.dispatcher.task_queue,
self.get_stats,
)
webhook_urls = context.settings.get("admin.webhook_urls")
if webhook_urls:
for url in webhook_urls:
self.admin_server.add_webhook_target(url)
context.injector.bind_instance(BaseAdminServer, self.admin_server)
if "http" not in self.outbound_transport_manager.registered_schemes:
self.outbound_transport_manager.register("http")
Expand Down Expand Up @@ -200,7 +196,6 @@ async def start(self) -> None:
responder = AdminResponder(
self.root_profile,
self.admin_server.outbound_message_router,
self.admin_server.send_webhook,
)
context.injector.bind_instance(BaseResponder, responder)

Expand Down Expand Up @@ -391,7 +386,6 @@ def inbound_message_router(
profile,
message,
self.outbound_message_router,
self.admin_server and self.admin_server.send_webhook,
lambda completed: self.dispatch_complete(message, completed),
)
except (LedgerConfigError, LedgerTransactionError) as e:
Expand Down
23 changes: 8 additions & 15 deletions aries_cloudagent/core/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import logging
import os
from typing import Callable, Coroutine, Union
import warnings

from aiohttp.web import HTTPException

from ..core.profile import Profile
from ..core.event_bus import EventBus, Event
from ..messaging.agent_message import AgentMessage
from ..messaging.error import MessageParseError
from ..messaging.models.base import BaseModelError
Expand Down Expand Up @@ -89,7 +89,6 @@ def queue_message(
profile: Profile,
inbound_message: InboundMessage,
send_outbound: Coroutine,
send_webhook: Coroutine = None,
complete: Callable = None,
) -> PendingTask:
"""
Expand All @@ -99,15 +98,14 @@ def queue_message(
profile: The profile associated with the inbound message
inbound_message: The inbound message instance
send_outbound: Async function to send outbound messages
send_webhook: Async function to dispatch a webhook
complete: Function to call when the handler has completed
Returns:
A pending task instance resolving to the handler task
"""
return self.put_task(
self.handle_message(profile, inbound_message, send_outbound, send_webhook),
self.handle_message(profile, inbound_message, send_outbound),
complete,
)

Expand All @@ -116,7 +114,6 @@ async def handle_message(
profile: Profile,
inbound_message: InboundMessage,
send_outbound: Coroutine,
send_webhook: Coroutine = None,
):
"""
Configure responder and message context and invoke the message handler.
Expand All @@ -125,7 +122,6 @@ async def handle_message(
profile: The profile associated with the inbound message
inbound_message: The inbound message instance
send_outbound: Async function to send outbound messages
send_webhook: Async function to dispatch a webhook
Returns:
The response from the handler
Expand Down Expand Up @@ -157,7 +153,6 @@ async def handle_message(
context,
inbound_message,
send_outbound,
send_webhook,
reply_session_id=inbound_message.session_id,
reply_to_verkey=inbound_message.receipt.sender_verkey,
)
Expand Down Expand Up @@ -248,7 +243,6 @@ def __init__(
context: RequestContext,
inbound_message: InboundMessage,
send_outbound: Coroutine,
send_webhook: Coroutine = None,
**kwargs,
):
"""
Expand All @@ -258,14 +252,12 @@ def __init__(
context: The request context of the incoming message
inbound_message: The inbound message triggering this handler
send_outbound: Async function to send outbound message
send_webhook: Async function to dispatch a webhook
"""
super().__init__(**kwargs)
self._context = context
self._inbound_message = inbound_message
self._send = send_outbound
self._webhook = send_webhook

async def create_outbound(
self, message: Union[AgentMessage, str, bytes], **kwargs
Expand Down Expand Up @@ -301,13 +293,14 @@ async def send_outbound(self, message: OutboundMessage):

async def send_webhook(self, topic: str, payload: dict):
"""
Dispatch a webhook.
Dispatch a webhook. DEPRECATED: use the event bus instead.
Args:
topic: the webhook topic identifier
payload: the webhook payload value
"""
bus: EventBus = self._context.inject(EventBus)
await bus.notify(self._context.profile, Event(topic, payload))
if self._webhook:
await self._webhook(self._context.profile, topic, payload)
warnings.warn(
"responder.send_webhook is deprecated; please use the event bus instead.",
DeprecationWarning,
)
await self._context.profile.notify("acapy::webhook::" + topic, payload)
20 changes: 17 additions & 3 deletions aries_cloudagent/core/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import logging
from itertools import chain
from typing import Any, Callable, Dict, Pattern, Sequence
from typing import TYPE_CHECKING, Any, Callable, Dict, Pattern, Sequence

from ..core.profile import Profile
if TYPE_CHECKING: # To avoid circular import error
from .profile import Profile

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -45,7 +46,7 @@ def __init__(self):
"""Initialize Event Bus."""
self.topic_patterns_to_subscribers: Dict[Pattern, Sequence[Callable]] = {}

async def notify(self, profile: Profile, event: Event):
async def notify(self, profile: "Profile", event: Event):
"""Notify subscribers of event.
Args:
Expand Down Expand Up @@ -103,3 +104,16 @@ def unsubscribe(self, pattern: Pattern, processor: Callable):
if not self.topic_patterns_to_subscribers[pattern]:
del self.topic_patterns_to_subscribers[pattern]
LOGGER.debug("Unsubscribed: topic %s, processor %s", pattern, processor)


class MockEventBus(EventBus):
"""A mock EventBus for testing."""

def __init__(self):
"""Initialize MockEventBus."""
super().__init__()
self.events = []

async def notify(self, profile: "Profile", event: Event):
"""Append the event to MockEventBus.events."""
self.events.append((profile, event))
Loading

0 comments on commit fd29562

Please sign in to comment.