Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add batching for event bus consumer #388

Merged
merged 8 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Change Log
Unreleased
~~~~~~~~~~

[8.2.0]

* Add support for batching for EventsRouter.

[8.1.2]

* Add grade.now_* events to the xAPI supported events list.
Expand Down
39 changes: 39 additions & 0 deletions docs/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,45 @@ A sample override for ``xapi`` backend is presented below. Here we are allowing
}
}

Batching Configuration
----------------------

Batching of events can be configured using the following settings:

#. ``EVENT_ROUTING_BACKEND_BATCHING_ENABLED``: If set to ``True``, events will be batched before being routed. Default is ``False``.
#. ``EVENT_ROUTING_BACKEND_BATCH_SIZE``: Maximum number of events to be batched together. Default is 100.
#. ``EVENT_ROUTING_BACKEND_BATCHING_INTERVAL``: Time interval (in seconds) after which events will be batched. Default is 60 seconds.
Ian2012 marked this conversation as resolved.
Show resolved Hide resolved

Batching is done in the ``EventsRouter`` backend. If ``EVENT_ROUTING_BACKEND_BATCHING_ENABLED`` is set to ``True``, then events will be batched together and routed to the configured routers after the specified interval or when the batch size is reached, whichever happens first.

In case of downtimes or network issues, events will be queued again to avoid data loss. However, there is no guarantee that the events will be routed in the same order as they were received.

Event bus configuration
-----------------------

The event bus backend can be configured as the producer of the events in which case, the events will be consumed from the event bus and routed to the configured routers. The event bus backend can be configured as follows with python:
Ian2012 marked this conversation as resolved.
Show resolved Hide resolved

.. code-block:: python

EVENT_TRACKING_BACKENDS["xapi"]["ENGINE"] = "eventtracking.backends.event_bus.EventBusRoutingBackend"
EVENT_TRACKING_BACKENDS["xapi"]["OPTIONS"]["backends"]["xapi"]["ENGINE"] = "event_routing_backends.backends.sync_events_router.SyncEventsRouter"
EVENT_TRACKING_BACKENDS["xapi"]["OPTIONS"].pop("backend_name")
Ian2012 marked this conversation as resolved.
Show resolved Hide resolved
INSTALLED_APPS.append("openedx_events")
Ian2012 marked this conversation as resolved.
Show resolved Hide resolved
SEND_TRACKING_EVENT_EMITTED_SIGNAL = True
EVENT_BUS_PRODUCER_CONFIG = {
"org.openedx.analytics.tracking.event.emitted.v1": {
"analytics": {
"event_key_field": "tracking_log.name", "enabled": True
}
}
}

Once the event bus producer has been configured, the event bus producer can be started using the following command:
Ian2012 marked this conversation as resolved.
Show resolved Hide resolved

.. code-block:: bash

./manage.py lms consume_events -t analytics -g event_routing_backends --extra '{"consumer_name": "event_routing_backends"}'

OpenEdx Filters
===============

Expand Down
2 changes: 1 addition & 1 deletion event_routing_backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Various backends for receiving edX LMS events..
"""

__version__ = '8.1.2'
__version__ = '8.2.0'
69 changes: 68 additions & 1 deletion event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
"""
Generic router to send events to hosts.
"""
import json
import logging
from datetime import datetime, timedelta

from django.conf import settings
from django_redis import get_redis_connection
from eventtracking.backends.logger import DateTimeJSONEncoder
from eventtracking.processors.exceptions import EventEmissionExit

from event_routing_backends.helpers import get_business_critical_events
from event_routing_backends.models import RouterConfiguration

logger = logging.getLogger(__name__)

EVENTS_ROUTER_QUEUE_FORMAT = 'events_router_queue_{}'
EVENTS_ROUTER_DEAD_QUEUE_FORMAT = 'dead_queue_{}'
EVENTS_ROUTER_LAST_SENT_FORMAT = 'last_sent_{}'


class EventsRouter:
"""
Expand All @@ -26,6 +35,9 @@ def __init__(self, processors=None, backend_name=None):
"""
self.processors = processors if processors else []
self.backend_name = backend_name
self.queue_name = EVENTS_ROUTER_QUEUE_FORMAT.format(self.backend_name)
self.dead_queue = EVENTS_ROUTER_DEAD_QUEUE_FORMAT.format(self.backend_name)
self.last_sent_key = EVENTS_ROUTER_LAST_SENT_FORMAT.format(self.backend_name)

def configure_host(self, host, router):
"""
Expand Down Expand Up @@ -117,6 +129,17 @@ def prepare_to_send(self, events):

return route_events

def get_failed_events(self):
"""
Get failed events from the dead queue.
"""
redis = get_redis_connection()
n = redis.llen(self.dead_queue)
if not n:
return []
failed_events = redis.rpop(self.dead_queue, n)
Ian2012 marked this conversation as resolved.
Show resolved Hide resolved
return [json.loads(event.decode('utf-8')) for event in failed_events]

def bulk_send(self, events):
"""
Send the event to configured routers after processing it.
Expand Down Expand Up @@ -154,8 +177,27 @@ def send(self, event):
Arguments:
event (dict): the original event dictionary
"""
event_routes = self.prepare_to_send([event])
if settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED:
redis = get_redis_connection()
batch = self.queue_event(redis, event)
if not batch:
return

try:
redis.set(self.last_sent_key, datetime.now().isoformat())
self.bulk_send([json.loads(queued_event.decode('utf-8')) for queued_event in batch])
except Exception: # pylint: disable=broad-except
logger.exception(
'Exception occurred while trying to bulk dispatch {} events.'.format(
len(batch)
),
exc_info=True
)
logger.info(f'Pushing failed events to the dead queue: {self.dead_queue}')
redis.lpush(self.dead_queue, *batch)
return

event_routes = self.prepare_to_send([event])
for events_for_route in event_routes.values():
for event_name, updated_event, host, is_business_critical in events_for_route:
if is_business_critical:
Expand All @@ -173,6 +215,31 @@ def send(self, event):
host['host_configurations'],
)

def queue_event(self, redis, event):
"""
Queue the event to be sent to configured routers.

"""
event["timestamp"] = event["timestamp"].isoformat()
queue_size = redis.lpush(self.queue_name, json.dumps(event, cls=DateTimeJSONEncoder))
logger.info(f'Event {event["name"]} has been queued for batching. Queue size: {queue_size}')

if queue_size >= settings.EVENT_ROUTING_BACKEND_BATCH_SIZE or self.time_to_send(redis):
batch = redis.rpop(self.queue_name, queue_size)
return batch

return None

def time_to_send(self, redis):
"""
Check if it is time to send the batched events.
"""
last_sent = redis.get(self.last_sent_key)
if not last_sent:
return True
time_passed = (datetime.now() - datetime.fromisoformat(last_sent.decode('utf-8')))
return time_passed > timedelta(seconds=settings.EVENT_ROUTING_BACKEND_BATCH_INTERVAL)

def process_event(self, event):
"""
Process the event through this router's processors.
Expand Down
111 changes: 110 additions & 1 deletion event_routing_backends/backends/tests/test_events_router.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""
Test the EventsRouter
"""
import datetime
import json
from copy import copy
from unittest.mock import MagicMock, call, patch, sentinel

import ddt
from django.conf import settings
from django.test import TestCase
from django.test import TestCase, override_settings
from edx_django_utils.cache.utils import TieredCache
from eventtracking.processors.exceptions import EventEmissionExit
from tincan.statement import Statement
Expand Down Expand Up @@ -257,6 +260,87 @@ def test_duplicate_xapi_event_id(self, mocked_logger):
mocked_logger.info.mock_calls
)

@override_settings(
EVENT_ROUTING_BACKEND_BATCHING_ENABLED=True,
EVENT_ROUTING_BACKEND_BATCH_SIZE=2
)
@patch('event_routing_backends.backends.events_router.get_redis_connection')
@patch('event_routing_backends.backends.events_router.logger')
@patch('event_routing_backends.backends.events_router.EventsRouter.bulk_send')
def test_queue_event(self, mock_bulk_send, mock_logger, mock_get_redis_connection):
router = EventsRouter(processors=[], backend_name='test')
redis_mock = MagicMock()
mock_get_redis_connection.return_value = redis_mock
redis_mock.lpush.return_value = None
event1 = copy(self.transformed_event)
event1["timestamp"] = datetime.datetime.now()
event2 = copy(self.transformed_event)
event2["timestamp"] = datetime.datetime.now()
events = [event1, event2]
formatted_events = []
for event in events:
formatted_event = copy(event)
formatted_event["timestamp"] = formatted_event["timestamp"].isoformat()
formatted_events.append(json.dumps(formatted_event).encode('utf-8'))

redis_mock.rpop.return_value = formatted_events
redis_mock.lpush.return_value = 1
redis_mock.get.return_value.decode.return_value = datetime.datetime.now().isoformat()

router.send(event1)
redis_mock.lpush.return_value = 2
router.send(event2)

redis_mock.lpush.assert_any_call(router.queue_name, json.dumps(event1))
redis_mock.rpop.assert_any_call(router.queue_name, settings.EVENT_ROUTING_BACKEND_BATCH_SIZE)
mock_logger.info.assert_any_call(
f"Event {self.transformed_event['name']} has been queued for batching. Queue size: 1"
)
mock_bulk_send.assert_any_call(events)

@override_settings(
EVENT_ROUTING_BACKEND_BATCHING_ENABLED=True,
EVENT_ROUTING_BACKEND_BATCH_SIZE=2
)
@patch('event_routing_backends.backends.events_router.get_redis_connection')
@patch('event_routing_backends.backends.events_router.logger')
@patch('event_routing_backends.backends.events_router.EventsRouter.bulk_send')
@patch('event_routing_backends.backends.events_router.EventsRouter.queue_event')
def test_send_event_with_bulk_exception(
self,
mock_queue_event,
mock_bulk_send,
mock_logger,
mock_get_redis_connection
):
router = EventsRouter(processors=[], backend_name='test')
redis_mock = MagicMock()
mock_get_redis_connection.return_value = redis_mock
mock_queue_event.return_value = [1]
mock_bulk_send.side_effect = EventNotDispatched

router.send(self.transformed_event)

mock_logger.exception.assert_called_once_with(
'Exception occurred while trying to bulk dispatch {} events.'.format(
1
),
exc_info=True
)
mock_logger.info.assert_called_once_with(
f'Pushing failed events to the dead queue: {router.dead_queue}'
)
redis_mock.lpush.assert_called_once_with(router.dead_queue, *[1])

@override_settings(
EVENT_ROUTING_BACKEND_BATCH_INTERVAL=1,
)
def test_time_to_send_no_data(self):
router = EventsRouter(processors=[], backend_name='test')
redis_mock = MagicMock()
redis_mock.get.return_value = None
self.assertTrue(router.time_to_send(redis_mock))


@ddt.ddt
class TestAsyncEventsRouter(TestEventsRouter): # pylint: disable=test-inherits-tests
Expand Down Expand Up @@ -1146,3 +1230,28 @@ def test_failed_routing(self, mocked_remote_lrs):
router = SyncEventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND)
with self.assertRaises(EventNotDispatched):
router.send(self.transformed_event)

@patch('event_routing_backends.backends.events_router.get_redis_connection')
def test_get_failed_events(self, mock_get_redis_connection):
redis_mock = MagicMock()
mock_get_redis_connection.return_value = redis_mock
redis_mock.llen.return_value = 1
redis_mock.rpop.return_value = [json.dumps({'name': 'test', 'data': {'key': 'value'}}).encode('utf-8')]

router = SyncEventsRouter(processors=[], backend_name='test')
router.get_failed_events()

redis_mock.llen.assert_called_once_with(router.dead_queue)
redis_mock.rpop.assert_called_once_with(router.dead_queue, 1)

@patch('event_routing_backends.backends.events_router.get_redis_connection')
def test_get_failed_events_empty(self, mock_get_redis_connection):
redis_mock = MagicMock()
mock_get_redis_connection.return_value = redis_mock
redis_mock.llen.return_value = 0

router = SyncEventsRouter(processors=[], backend_name='test')
events = router.get_failed_events()

redis_mock.llen.assert_called_once_with(router.dead_queue)
self.assertEqual(events, [])
2 changes: 1 addition & 1 deletion event_routing_backends/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def get_user(username_or_id):
if username and not user:
try:
user = get_potentially_retired_user_by_username(username)
except Exception as ex:
except Exception as ex: # pylint: disable=broad-except
logger.info('User with username "%s" does not exist.%s', username, ex)

return user
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
Management command for resending events when a failure occurs.
"""

import logging
from textwrap import dedent

from django.conf import settings
from django.core.management.base import BaseCommand
from eventtracking.backends.event_bus import EventBusRoutingBackend
from eventtracking.tracker import get_tracker

logger = logging.getLogger(__name__)


class Command(BaseCommand):
"""
Management command for resending events when a failure occurs
in the event routing backend.
"""

help = dedent(__doc__).strip()

def add_arguments(self, parser):
parser.add_argument(
"--transformer_type",
choices=["xapi", "caliper", "all"],
required=True,
help="The type of transformation to do, only one can be done at a time.",
Ian2012 marked this conversation as resolved.
Show resolved Hide resolved
)

def handle(self, *args, **options):
"""
Configure the command and start the transform process.
"""
logger.info("Recovering failed events")
transformer_type = options["transformer_type"]
tracker = get_tracker()

engines = {
name: engine
for name, engine in tracker.backends.items()
if isinstance(engine, EventBusRoutingBackend)
}

if not engines:
logger.info("No compatible backend found.")
return

settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = False
Ian2012 marked this conversation as resolved.
Show resolved Hide resolved

for name, engine in engines.items():
if transformer_type not in ("all", name):
logger.info("Skipping backend: {}".format(name))
continue
for backend_name, backend in engine.backends.items():
failed_events = backend.get_failed_events()
if not failed_events:
logger.info(
"No failed events found for backend: {}".format(backend_name)
)
continue
for event in failed_events:
backend.send(event)
Loading
Loading