Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event bus Metadata #1429

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 46 additions & 8 deletions aries_cloudagent/core/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import asyncio
from contextlib import contextmanager
from itertools import chain
import logging
from typing import (
Any,
Expand All @@ -11,10 +10,13 @@
Dict,
Iterator,
List,
Match,
NamedTuple,
Optional,
Pattern,
TYPE_CHECKING,
)
from functools import partial

if TYPE_CHECKING: # To avoid circular import error
from .profile import Profile
Expand Down Expand Up @@ -50,6 +52,31 @@ def __repr__(self):
"""Return debug representation."""
return "<Event topic={}, payload={}>".format(self._topic, self._payload)

def with_metadata(self, metadata: "EventMetadata") -> "EventWithMetadata":
"""Annotate event with metadata and return EventWithMetadata object."""
return EventWithMetadata(self.topic, self.payload, metadata)


class EventMetadata(NamedTuple):
"""Metadata passed alongside events to add context."""

pattern: Pattern
match: Match[str]


class EventWithMetadata(Event):
"""Event with metadata passed alongside events to add context."""

def __init__(self, topic: str, payload: Any, metadata: EventMetadata):
"""Initialize event metadata."""
super().__init__(topic, payload)
self._metadata = metadata

@property
def metadata(self) -> EventMetadata:
"""Return metadata."""
return self._metadata


class EventBus:
"""A simple event bus implementation."""
Expand All @@ -71,15 +98,26 @@ async def notify(self, profile: "Profile", event: Event):
# TODO log errors but otherwise ignore?

LOGGER.debug("Notifying subscribers: %s", event)
matched = [
processor
for pattern, processor in self.topic_patterns_to_subscribers.items()
if pattern.match(event.topic)
]

for processor in chain(*matched):
partials = []
for pattern, subscribers in self.topic_patterns_to_subscribers.items():
match = pattern.match(event.topic)

if not match:
continue

for subscriber in subscribers:
partials.append(
partial(
subscriber,
profile,
event.with_metadata(EventMetadata(pattern, match)),
)
)

for processor in partials:
try:
await processor(profile, event)
await processor()
except Exception:
LOGGER.exception("Error occurred while processing event")

Expand Down