Skip to content

Commit

Permalink
feat: refactor EventsRouter bettween sync and async router
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Oct 30, 2023
1 parent 6b9530b commit eb53c0d
Show file tree
Hide file tree
Showing 8 changed files with 540 additions and 125 deletions.
45 changes: 45 additions & 0 deletions event_routing_backends/backends/async_events_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""
Generic async router to send events to hosts via celery.
"""
from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent
from event_routing_backends.backends.events_router import EventsRouter

class AsyncEventsRouter(EventsRouter):
"""
Router to send events to hosts via celery using requests library.
"""

def dispatch_event(self, event_name, event, router_type, host_configurations):
"""
Dispatch the event to the configured router.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
dispatch_event.delay(event_name, event, router_type, host_configurations)

def dispatch_bulk_events(self, events, router_type, host_configurations):
"""
Dispatch the a list of events to the configured router in bulk.
Arguments:
events (list[dict]): list of processed event dictionaries
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
dispatch_bulk_events.delay(events, router_type, host_configurations)

def dispatch_event_persistent(self, event_name, event, router_type, host_configurations):
"""
Dispatch the event to the configured router providing persistent storage.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
dispatch_event_persistent.delay(event_name, event, router_type, host_configurations)
50 changes: 42 additions & 8 deletions event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from event_routing_backends.helpers import get_business_critical_events
from event_routing_backends.models import RouterConfiguration
from event_routing_backends.tasks import dispatch_bulk_events, dispatch_event, dispatch_event_persistent

logger = logging.getLogger(__name__)

Expand All @@ -17,7 +16,7 @@ class EventsRouter:
Router to send events to hosts using requests library.
"""

def __init__(self, processors=None, backend_name=None, sync=False):
def __init__(self, processors=None, backend_name=None):
"""
Initialize the router.
Expand All @@ -28,7 +27,6 @@ def __init__(self, processors=None, backend_name=None, sync=False):
"""
self.processors = processors if processors else []
self.backend_name = backend_name
self.sync = sync

def configure_host(self, host, router):
"""
Expand Down Expand Up @@ -140,7 +138,7 @@ def bulk_send(self, events):
prepared_events.append(updated_event)

if prepared_events: # pragma: no cover
dispatch_bulk_events.delay(
self.dispatch_bulk_events(
prepared_events,
host['router_type'],
host['host_configurations']
Expand All @@ -161,17 +159,18 @@ def send(self, event):

for events_for_route in event_routes.values():
for event_name, updated_event, host, is_business_critical in events_for_route:
func = dispatch_event_persistent if is_business_critical else dispatch_event
func = func if self.sync else func.delay
from event_routing_backends.backends.sync_events_router import SyncEventsRouter
#if event_name == 'edx.course.enrollment.activated' and self.__class__ == SyncEventsRouter:
# raise ValueError(f"event_name {event_name}, host {host}, is_business_critical {is_business_critical}")
if is_business_critical:
func(
self.dispatch_event_persistent(
event_name,
updated_event,
host['router_type'],
host['host_configurations'],
)
else:
func(
self.dispatch_event(
event_name,
updated_event,
host['router_type'],
Expand Down Expand Up @@ -219,3 +218,38 @@ def overwrite_event_data(self, event, host, event_name):
host['override_args']
))
return event

def dispatch_event(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
raise NotImplementedError('dispatch_event is not implemented')

def dispatch_bulk_events(self, events, router_type, host_configurations):
"""
Dispatch the a list of events to the configured router in bulk.
Arguments:
events (list[dict]): list of processed event dictionaries
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
raise NotImplementedError('dispatch_bulk_events is not implemented')

def dispatch_event_persistent(self, event_name, updated_event, router_type, host_configurations):
"""
Dispatch the event to the configured router providing persistent storage.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
raise NotImplementedError('dispatch_event_persistent is not implemented')
45 changes: 45 additions & 0 deletions event_routing_backends/backends/sync_events_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""
Generic router to send events to hosts.
"""
from event_routing_backends.tasks import send_event, bulk_send_events
from event_routing_backends.backends.events_router import EventsRouter

class SyncEventsRouter(EventsRouter):
"""
Router to send events to hosts via celery using requests library.
"""

def dispatch_event(self, event_name, event, router_type, host_configurations):
"""
Dispatch the event to the configured router.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
send_event(None, event_name, event, router_type, host_configurations)

def dispatch_bulk_events(self, events, router_type, host_configurations):
"""
Dispatch the a list of events to the configured router in bulk.
Arguments:
events (list[dict]): list of processed event dictionaries
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
bulk_send_events(None, events, router_type, host_configurations)

def dispatch_event_persistent(self, event_name, event, router_type, host_configurations):
"""
Dispatch the event to the configured router providing persistent storage.
Arguments:
event_name (str): name of the original event.
updated_event (dict): processed event dictionary
router_type (str): type of the router
host_configurations (dict): host configurations dict
"""
self.dispatch_event(event_name, event, router_type, host_configurations)

Check warning on line 45 in event_routing_backends/backends/sync_events_router.py

View check run for this annotation

Codecov / codecov/patch

event_routing_backends/backends/sync_events_router.py#L45

Added line #L45 was not covered by tests
Loading

0 comments on commit eb53c0d

Please sign in to comment.