Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow to route events in sync #361

Merged
merged 2 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ Change Log
Unreleased
~~~~~~~~~~

[8.1.0]
~~~~~~~

* Add support to consume tracking logs from the event bus.

[8.0.0]
~~~~~~~

Expand Down
2 changes: 1 addition & 1 deletion event_routing_backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Various backends for receiving edX LMS events..
"""

__version__ = '8.0.0'
__version__ = '8.1.0'
49 changes: 49 additions & 0 deletions event_routing_backends/backends/async_events_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
Events router to send events to hosts via celery.

This events router will trigger a celery task to send the events to the
configured hosts.
"""
from event_routing_backends.backends.events_router import EventsRouter
from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent


class AsyncEventsRouter(EventsRouter):
"""
Router to send events to hosts via celery using requests library.
"""

def dispatch_event(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router.

Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
dispatch_event.delay(event_name, updated_event, router_type, host_configurations)

def dispatch_bulk_events(self, events, router_type, host_configurations):
"""
Dispatch the a list of events to the configured router in bulk.

Arguments:
events (list[dict]): list of processed event dictionaries
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
dispatch_bulk_events.delay(events, router_type, host_configurations)

def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router providing persistent storage.

Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
dispatch_event_persistent.delay(event_name, updated_event, router_type, host_configurations)
46 changes: 40 additions & 6 deletions event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from event_routing_backends.helpers import get_business_critical_events
from event_routing_backends.models import RouterConfiguration
from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -138,7 +137,7 @@ def bulk_send(self, events):
prepared_events.append(updated_event)

if prepared_events: # pragma: no cover
dispatch_bulk_events.delay(
self.dispatch_bulk_events(
prepared_events,
host['router_type'],
host['host_configurations']
Expand All @@ -160,18 +159,18 @@ def send(self, event):
for events_for_route in event_routes.values():
for event_name, updated_event, host, is_business_critical in events_for_route:
if is_business_critical:
dispatch_event_persistent.delay(
self.dispatch_event_persistent(
event_name,
updated_event,
host['router_type'],
host['host_configurations']
host['host_configurations'],
)
else:
dispatch_event.delay(
self.dispatch_event(
event_name,
updated_event,
host['router_type'],
host['host_configurations']
host['host_configurations'],
)

def process_event(self, event):
Expand Down Expand Up @@ -215,3 +214,38 @@ def overwrite_event_data(self, event, host, event_name):
host['override_args']
))
return event

def dispatch_event(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router.

Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
raise NotImplementedError('dispatch_event is not implemented')

def dispatch_bulk_events(self, events, router_type, host_configurations):
"""
Dispatch the a list of events to the configured router in bulk.

Arguments:
events (list[dict]): list of processed event dictionaries
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
raise NotImplementedError('dispatch_bulk_events is not implemented')

def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router providing persistent storage.

Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
raise NotImplementedError('dispatch_event_persistent is not implemented')
51 changes: 51 additions & 0 deletions event_routing_backends/backends/sync_events_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Events router to send events to hosts in sync mode.

This router is expected to be used with the event bus, which
can be configured to use this router to send events to hosts
in the same thread as it process the events.
"""
from event_routing_backends.backends.events_router import EventsRouter
from event_routing_backends.tasks import bulk_send_events, send_event


class SyncEventsRouter(EventsRouter):
"""
Router to send events to hosts using requests library.
"""

def dispatch_event(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router.

Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
send_event(None, event_name, updated_event, router_type, host_configurations)

def dispatch_bulk_events(self, events, router_type, host_configurations):
"""
Dispatch the a list of events to the configured router in bulk.

Arguments:
events (list[dict]): list of processed event dictionaries
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
bulk_send_events(None, events, router_type, host_configurations)

def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router providing persistent storage.
In this case, the event bus is expected to provide the persistent storage layer.

Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
self.dispatch_event(event_name, updated_event, router_type, host_configurations)
Ian2012 marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading