Skip to content

Commit

Permalink
Merge pull request #361 from openedx/cag/add-consumer-for-tracking-logs
Browse files Browse the repository at this point in the history
feat: allow to route events in sync
  • Loading branch information
Ian2012 authored Feb 19, 2024
2 parents 2363d2d + 2f71c62 commit 8bbd98c
Show file tree
Hide file tree
Showing 18 changed files with 1,009 additions and 392 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ Change Log
Unreleased
~~~~~~~~~~

[8.1.0]
~~~~~~~

* Add support to consume tracking logs from the event bus.

[8.0.0]
~~~~~~~

Expand Down
2 changes: 1 addition & 1 deletion event_routing_backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Various backends for receiving edX LMS events..
"""

__version__ = '8.0.0'
__version__ = '8.1.0'
49 changes: 49 additions & 0 deletions event_routing_backends/backends/async_events_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
Events router to send events to hosts via celery.
This events router will trigger a celery task to send the events to the
configured hosts.
"""
from event_routing_backends.backends.events_router import EventsRouter
from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent


class AsyncEventsRouter(EventsRouter):
"""
Router to send events to hosts via celery using requests library.
"""

def dispatch_event(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
dispatch_event.delay(event_name, updated_event, router_type, host_configurations)

def dispatch_bulk_events(self, events, router_type, host_configurations):
"""
Dispatch the a list of events to the configured router in bulk.
Arguments:
events (list[dict]): list of processed event dictionaries
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
dispatch_bulk_events.delay(events, router_type, host_configurations)

def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router providing persistent storage.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
dispatch_event_persistent.delay(event_name, updated_event, router_type, host_configurations)
46 changes: 40 additions & 6 deletions event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from event_routing_backends.helpers import get_business_critical_events
from event_routing_backends.models import RouterConfiguration
from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -138,7 +137,7 @@ def bulk_send(self, events):
prepared_events.append(updated_event)

if prepared_events: # pragma: no cover
dispatch_bulk_events.delay(
self.dispatch_bulk_events(
prepared_events,
host['router_type'],
host['host_configurations']
Expand All @@ -160,18 +159,18 @@ def send(self, event):
for events_for_route in event_routes.values():
for event_name, updated_event, host, is_business_critical in events_for_route:
if is_business_critical:
dispatch_event_persistent.delay(
self.dispatch_event_persistent(
event_name,
updated_event,
host['router_type'],
host['host_configurations']
host['host_configurations'],
)
else:
dispatch_event.delay(
self.dispatch_event(
event_name,
updated_event,
host['router_type'],
host['host_configurations']
host['host_configurations'],
)

def process_event(self, event):
Expand Down Expand Up @@ -215,3 +214,38 @@ def overwrite_event_data(self, event, host, event_name):
host['override_args']
))
return event

def dispatch_event(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
raise NotImplementedError('dispatch_event is not implemented')

def dispatch_bulk_events(self, events, router_type, host_configurations):
"""
Dispatch the a list of events to the configured router in bulk.
Arguments:
events (list[dict]): list of processed event dictionaries
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
raise NotImplementedError('dispatch_bulk_events is not implemented')

def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router providing persistent storage.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
raise NotImplementedError('dispatch_event_persistent is not implemented')
51 changes: 51 additions & 0 deletions event_routing_backends/backends/sync_events_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Events router to send events to hosts in sync mode.
This router is expected to be used with the event bus, which
can be configured to use this router to send events to hosts
in the same thread as it process the events.
"""
from event_routing_backends.backends.events_router import EventsRouter
from event_routing_backends.tasks import bulk_send_events, send_event


class SyncEventsRouter(EventsRouter):
"""
Router to send events to hosts using requests library.
"""

def dispatch_event(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
send_event(None, event_name, updated_event, router_type, host_configurations)

def dispatch_bulk_events(self, events, router_type, host_configurations):
"""
Dispatch the a list of events to the configured router in bulk.
Arguments:
events (list[dict]): list of processed event dictionaries
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
bulk_send_events(None, events, router_type, host_configurations)

def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router providing persistent storage.
In this case, the event bus is expected to provide the persistent storage layer.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
self.dispatch_event(event_name, updated_event, router_type, host_configurations)
Loading

0 comments on commit 8bbd98c

Please sign in to comment.