Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add batching for event bus consumer #388

Merged
merged 8 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from django.conf import settings
from django_redis import get_redis_connection
from eventtracking.backends.logger import DateTimeJSONEncoder
from eventtracking.processors.exceptions import EventEmissionExit

from event_routing_backends.helpers import get_business_critical_events
Expand All @@ -15,6 +16,7 @@
logger = logging.getLogger(__name__)

EVENTS_ROUTER_QUEUE_FORMAT = 'events_router_queue_{}'
EVENTS_ROUTER_DEAD_QUEUE_FORMAT = 'dead_queue_{}'
EVENTS_ROUTER_LAST_SENT_FORMAT = 'last_sent_{}'


Expand All @@ -34,6 +36,7 @@ def __init__(self, processors=None, backend_name=None):
self.processors = processors if processors else []
self.backend_name = backend_name
self.queue_name = EVENTS_ROUTER_QUEUE_FORMAT.format(self.backend_name)
self.dead_queue = EVENTS_ROUTER_DEAD_QUEUE_FORMAT.format(self.backend_name)
self.last_sent_key = EVENTS_ROUTER_LAST_SENT_FORMAT.format(self.backend_name)

def configure_host(self, host, router):
Expand Down Expand Up @@ -126,6 +129,17 @@ def prepare_to_send(self, events):

return route_events

def get_failed_events(self):
"""
Get failed events from the dead queue.
"""
redis = get_redis_connection()
n = redis.llen(self.dead_queue)
if not n:
return []
failed_events = redis.rpop(self.dead_queue, n)
Ian2012 marked this conversation as resolved.
Show resolved Hide resolved
return [json.loads(event.decode('utf-8')) for event in failed_events]

def bulk_send(self, events):
"""
Send the event to configured routers after processing it.
Expand Down Expand Up @@ -179,8 +193,8 @@ def send(self, event):
),
exc_info=True
)
logger.info('Re sending the batched events to the queue.')
redis.lpush(self.queue_name, *batch)
logger.info(f'Pushing failed events to the dead queue: {self.dead_queue}')
redis.lpush(self.dead_queue, *batch)
return

event_routes = self.prepare_to_send([event])
Expand All @@ -207,7 +221,7 @@ def queue_event(self, redis, event):

"""
event["timestamp"] = event["timestamp"].isoformat()
queue_size = redis.lpush(self.queue_name, json.dumps(event))
queue_size = redis.lpush(self.queue_name, json.dumps(event, cls=DateTimeJSONEncoder))
logger.info(f'Event {event["name"]} has been queued for batching. Queue size: {queue_size}')

if queue_size >= settings.EVENT_ROUTING_BACKEND_BATCH_SIZE or self.time_to_send(redis):
Expand Down
30 changes: 28 additions & 2 deletions event_routing_backends/backends/tests/test_events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,9 @@ def test_send_event_with_bulk_exception(
exc_info=True
)
mock_logger.info.assert_called_once_with(
'Re sending the batched events to the queue.'
f'Pushing failed events to the dead queue: {router.dead_queue}'
)
redis_mock.lpush.assert_called_once_with(router.queue_name, *[1])
redis_mock.lpush.assert_called_once_with(router.dead_queue, *[1])

@override_settings(
EVENT_ROUTING_BACKEND_BATCH_INTERVAL=1,
Expand Down Expand Up @@ -1230,3 +1230,29 @@ def test_failed_routing(self, mocked_remote_lrs):
router = SyncEventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND)
with self.assertRaises(EventNotDispatched):
router.send(self.transformed_event)

@patch('event_routing_backends.backends.events_router.get_redis_connection')
def test_get_failed_events(self, mock_get_redis_connection):
redis_mock = MagicMock()
mock_get_redis_connection.return_value = redis_mock
redis_mock.llen.return_value = 1
redis_mock.rpop.return_value = [json.dumps({'name': 'test', 'data': {'key': 'value'}}).encode('utf-8')]

router = SyncEventsRouter(processors=[], backend_name='test')
router.get_failed_events()

redis_mock.llen.assert_called_once_with(router.dead_queue)
redis_mock.rpop.assert_called_once_with(router.dead_queue, 1)


@patch('event_routing_backends.backends.events_router.get_redis_connection')
def test_get_failed_events_empty(self, mock_get_redis_connection):
redis_mock = MagicMock()
mock_get_redis_connection.return_value = redis_mock
redis_mock.llen.return_value = 0

router = SyncEventsRouter(processors=[], backend_name='test')
events = router.get_failed_events()

redis_mock.llen.assert_called_once_with(router.dead_queue)
self.assertEqual(events, [])
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""
Management command for resending events when a failure occurs.
"""

import logging
from textwrap import dedent

from django.conf import settings
from django.core.management.base import BaseCommand
from eventtracking.backends.async_routing import AsyncRoutingBackend
from eventtracking.backends.event_bus import EventBusRoutingBackend
from eventtracking.tracker import get_tracker

logger = logging.getLogger(__name__)


class Command(BaseCommand):
"""
Management command for resending events when a failure occurs
in the event routing backend.
"""

help = dedent(__doc__).strip()

def add_arguments(self, parser):
parser.add_argument(
"--transformer_type",
choices=["xapi", "caliper", "all"],
required=True,
help="The type of transformation to do, only one can be done at a time.",
Ian2012 marked this conversation as resolved.
Show resolved Hide resolved
)

def handle(self, *args, **options):
"""
Configure the command and start the transform process.
"""
logger.info("Recovering failed events")
transformer_type = options["transformer_type"]
tracker = get_tracker()

engines = {
name: engine
for name, engine in tracker.backends.items()
if isinstance(engine, EventBusRoutingBackend)
}

if not engines:
logger.info("No compatible backend found.")
return

settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = False
Ian2012 marked this conversation as resolved.
Show resolved Hide resolved

for name, engine in engines.items():
if transformer_type not in ("all", name):
logger.info("Skipping backend: {}".format(name))
continue
for backend_name, backend in engine.backends.items():
failed_events = backend.get_failed_events()
if not failed_events:
logger.info(
"No failed events found for backend: {}".format(backend_name)
)
continue
for event in failed_events:
backend.send(event)
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""
Tests for the transform_tracking_logs management command.
"""

from unittest.mock import Mock, patch

from django.core.management import call_command
from django.test import TestCase
from django.test.utils import override_settings
from eventtracking.django.django_tracker import DjangoTracker

from event_routing_backends.management.commands.recover_failed_events import Command as RecoverFailedEventsCommand


class TestRecoverFailedEvents(TestCase):
@override_settings(
EVENT_TRACKING_BACKENDS={
"event_bus": {
"ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend",
"OPTIONS": {
"backends": {
"xapi": {
"ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter",
"OPTIONS": {
"processors": [
{
"ENGINE": "event_routing_backends.processors.xapi.transformer_processor.XApiProcessor",
"OPTIONS": {},
}
],
"backend_name": "xapi",
},
}
},
},
},
}
)
@patch("event_routing_backends.management.commands.recover_failed_events.get_tracker")
def test_send_tracking_log_to_backends(self, mock_get_tracker):
"""
Test for send_tracking_log_to_backends
"""
tracker = DjangoTracker()
mock_get_tracker.return_value = tracker
mock_backend = Mock()
tracker.backends["event_bus"].backends["xapi"] = mock_backend
mock_backend.get_failed_events.return_value = [{"event": "event"}]

call_command(
'recover_failed_events',
transformer_type="all"
)

mock_backend.send.assert_called_once_with({"event": "event"})


@override_settings(
EVENT_TRACKING_BACKENDS={
"event_bus": {
"ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend",
"OPTIONS": {
"backends": {
"xapi": {
"ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter",
"OPTIONS": {
"processors": [
{
"ENGINE": "event_routing_backends.processors.xapi.transformer_processor.XApiProcessor",
"OPTIONS": {},
}
],
"backend_name": "xapi",
},
}
},
},
},
"xapi": {
"ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend",
"OPTIONS": {
"backends": {
"xapi": {
"ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter",
"OPTIONS": {
"processors": [
{
"ENGINE": "event_routing_backends.processors.xapi.transformer_processor.XApiProcessor",
"OPTIONS": {},
}
],
"backend_name": "xapi",
},
}
},
},
},
}
)
@patch("event_routing_backends.management.commands.recover_failed_events.get_tracker")
def test_send_tracking_log_to_backends_no_failed_events(self, mock_get_tracker):
"""
Test for send_tracking_log_to_backends
"""
tracker = DjangoTracker()
mock_get_tracker.return_value = tracker
mock_backend = Mock()
tracker.backends["xapi"].backends["xapi"] = mock_backend
mock_backend.get_failed_events.return_value = []

call_command(
'recover_failed_events',
transformer_type="xapi"
)

mock_backend.send.assert_not_called()

@override_settings(
EVENT_TRACKING_BACKENDS={
"event_bus": {
"ENGINE": "eventtracking.backends.logger.LoggerBackend",
"OPTIONS": {},
},
}
)
@patch("event_routing_backends.management.commands.recover_failed_events.get_tracker")
@patch("event_routing_backends.management.commands.recover_failed_events.logger")
def test_send_tracking_log_to_backends_no_engines(self, mock_logger, mock_get_tracker):
"""
Test for send_tracking_log_to_backends
"""
tracker = DjangoTracker()
mock_get_tracker.return_value = tracker

call_command(
'recover_failed_events',
transformer_type="all"
)

mock_logger.info.assert_any_call("Recovering failed events")
mock_logger.info.assert_any_call("No compatible backend found.")
Loading