Skip to content

Commit

Permalink
feat: send events in batch from routers
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Feb 22, 2024
1 parent 8bbd98c commit 87d7252
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 28 deletions.
33 changes: 32 additions & 1 deletion event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
"""
Generic router to send events to hosts.
"""
import json
import logging

from django.conf import settings
from django_redis import get_redis_connection
from eventtracking.processors.exceptions import EventEmissionExit

from event_routing_backends.helpers import get_business_critical_events
from event_routing_backends.models import RouterConfiguration

logger = logging.getLogger(__name__)

EVENTS_ROUTER_QUEUE_FORMAT = 'events_router_queue_{}'


class EventsRouter:
"""
Expand All @@ -26,6 +31,7 @@ def __init__(self, processors=None, backend_name=None):
"""
self.processors = processors if processors else []
self.backend_name = backend_name
self.queue_name = EVENTS_ROUTER_QUEUE_FORMAT.format(self.backend_name)

def configure_host(self, host, router):
"""
Expand Down Expand Up @@ -154,8 +160,15 @@ def send(self, event):
Arguments:
event (dict): the original event dictionary
"""
event_routes = self.prepare_to_send([event])
if settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED:
events = self.queue_event(event)
if not events:
return

self.bulk_send(events)
return

event_routes = self.prepare_to_send([event])
for events_for_route in event_routes.values():
for event_name, updated_event, host, is_business_critical in events_for_route:
if is_business_critical:
Expand All @@ -173,6 +186,24 @@ def send(self, event):
host['host_configurations'],
)

def queue_event(self, event):
"""
Queue the event to be sent to configured routers.
"""
redis = get_redis_connection()

event["timestamp"] = event["timestamp"].isoformat()
redis.lpush(self.queue_name, json.dumps(event))
queue_size = redis.llen(self.queue_name)

logger.info(f'Event {event["name"]} has been queued for batching.')
if queue_size < settings.EVENT_ROUTING_BACKEND_BATCH_SIZE:
return None
batch = redis.rpop(self.queue_name, queue_size)
events = [json.loads(queued_event.decode('utf-8')) for queued_event in batch]
return events

def process_event(self, event):
"""
Process the event through this router's processors.
Expand Down
43 changes: 42 additions & 1 deletion event_routing_backends/backends/tests/test_events_router.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""
Test the EventsRouter
"""
import datetime
import json
from copy import copy
from unittest.mock import MagicMock, call, patch, sentinel

import ddt
from django.conf import settings
from django.test import TestCase
from django.test import TestCase, override_settings
from edx_django_utils.cache.utils import TieredCache
from eventtracking.processors.exceptions import EventEmissionExit
from tincan.statement import Statement
Expand Down Expand Up @@ -257,6 +260,44 @@ def test_duplicate_xapi_event_id(self, mocked_logger):
mocked_logger.info.mock_calls
)

@override_settings(
EVENT_ROUTING_BACKEND_BATCHING_ENABLED=True,
EVENT_ROUTING_BACKEND_BATCH_SIZE=2
)
@patch('event_routing_backends.backends.events_router.get_redis_connection')
@patch('event_routing_backends.backends.events_router.logger')
@patch('event_routing_backends.backends.events_router.EventsRouter.bulk_send')
def test_queue_event(self, mock_bulk_send, mock_logger, mock_get_redis_connection):
router = EventsRouter(processors=[], backend_name='test')
redis_mock = MagicMock()
mock_get_redis_connection.return_value = redis_mock
redis_mock.lpush.return_value = None
event1 = copy(self.transformed_event)
event1["timestamp"] = datetime.datetime.now()
event2 = copy(self.transformed_event)
event2["timestamp"] = datetime.datetime.now()
events = [event1, event2]
formatted_events = []
for event in events:
formatted_event = copy(event)
formatted_event["timestamp"] = formatted_event["timestamp"].isoformat()
formatted_events.append(json.dumps(formatted_event).encode('utf-8'))

redis_mock.rpop.return_value = formatted_events
redis_mock.llen.return_value = 1

router.send(event1)
redis_mock.llen.return_value = 2
router.send(event2)

redis_mock.lpush.assert_any_call(router.queue_name, json.dumps(event1))
redis_mock.llen.assert_any_call(router.queue_name)
redis_mock.rpop.assert_any_call(router.queue_name, settings.EVENT_ROUTING_BACKEND_BATCH_SIZE)
mock_logger.info.assert_any_call(
f"Event {self.transformed_event['name']} has been queued for batching."
)
mock_bulk_send.assert_any_call(events)


@ddt.ddt
class TestAsyncEventsRouter(TestEventsRouter): # pylint: disable=test-inherits-tests
Expand Down
3 changes: 2 additions & 1 deletion event_routing_backends/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ def plugin_settings(settings):
settings.EVENT_ROUTING_BACKEND_COUNTDOWN = 30
settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_MAX_RETRIES = 3
settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_COUNTDOWN = 1

settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = True
settings.EVENT_ROUTING_BACKEND_BATCH_SIZE = 10
# .. setting_name: XAPI_AGENT_IFI_TYPE
# .. setting_default: 'external_id'
# .. setting_description: This setting can be used to specify the type of inverse functional identifier
Expand Down
8 changes: 8 additions & 0 deletions event_routing_backends/settings/production.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ def plugin_settings(settings):
'EVENT_ROUTING_BACKEND_COUNTDOWN',
settings.EVENT_ROUTING_BACKEND_COUNTDOWN
)
settings.EVENT_ROUTING_BACKEND_BATCH_SIZE = settings.ENV_TOKENS.get(
'EVENT_ROUTING_BACKEND_BATCH_SIZE',
settings.EVENT_ROUTING_BACKEND_BATCH_SIZE
)
settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = settings.ENV_TOKENS.get(
'EVENT_ROUTING_BACKEND_BATCHING_ENABLED',
settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED
)
settings.CALIPER_EVENTS_ENABLED = settings.ENV_TOKENS.get(
'CALIPER_EVENTS_ENABLED',
settings.CALIPER_EVENTS_ENABLED
Expand Down
1 change: 1 addition & 0 deletions requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ edx-celeryutils
apache-libcloud # For bulk event log loading
fasteners # Locking tools, required by apache-libcloud, but somehow not installed with it
openedx-filters
django-redis
13 changes: 10 additions & 3 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ apache-libcloud==3.8.0
# via -r requirements/base.in
asgiref==3.7.2
# via django
async-timeout==4.0.3
# via redis
attrs==23.2.0
# via openedx-events
backports-zoneinfo[tzdata]==0.2.1
Expand Down Expand Up @@ -48,7 +50,7 @@ click-repl==0.3.0
# via celery
code-annotations==1.6.0
# via edx-toggles
cryptography==42.0.3
cryptography==42.0.4
# via django-fernet-fields-v2
django==3.2.24
# via
Expand All @@ -58,6 +60,7 @@ django==3.2.24
# django-crum
# django-fernet-fields-v2
# django-model-utils
# django-redis
# django-waffle
# djangorestframework
# edx-celeryutils
Expand All @@ -77,6 +80,8 @@ django-fernet-fields-v2==0.9
# via -r requirements/base.in
django-model-utils==4.4.0
# via edx-celeryutils
django-redis==5.4.0
# via -r requirements/base.in
django-waffle==4.1.0
# via
# edx-django-utils
Expand Down Expand Up @@ -119,7 +124,7 @@ markupsafe==2.1.5
# via jinja2
newrelic==9.6.0
# via edx-django-utils
openedx-events==9.5.1
openedx-events==9.5.2
# via event-tracking
openedx-filters==1.6.0
# via -r requirements/base.in
Expand Down Expand Up @@ -152,6 +157,8 @@ pytz==2024.1
# tincan
pyyaml==6.0.1
# via code-annotations
redis==5.0.1
# via django-redis
requests==2.31.0
# via
# -r requirements/base.in
Expand Down Expand Up @@ -181,7 +188,7 @@ tzdata==2024.1
# via
# backports-zoneinfo
# celery
urllib3==2.2.0
urllib3==2.2.1
# via requests
vine==5.1.0
# via
Expand Down
2 changes: 1 addition & 1 deletion requirements/ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ tomli==2.0.1
# via
# pyproject-api
# tox
tox==4.12.1
tox==4.13.0
# via -r requirements/ci.in
virtualenv==20.25.0
# via tox
23 changes: 17 additions & 6 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ astroid==3.0.3
# -r requirements/quality.txt
# pylint
# pylint-celery
async-timeout==4.0.3
# via
# -r requirements/quality.txt
# redis
attrs==23.2.0
# via
# -r requirements/quality.txt
Expand Down Expand Up @@ -106,11 +110,11 @@ colorama==0.4.6
# via
# -r requirements/ci.txt
# tox
coverage[toml]==7.4.1
coverage[toml]==7.4.2
# via
# -r requirements/quality.txt
# pytest-cov
cryptography==42.0.3
cryptography==42.0.4
# via
# -r requirements/quality.txt
# django-fernet-fields-v2
Expand All @@ -136,6 +140,7 @@ django==3.2.24
# django-crum
# django-fernet-fields-v2
# django-model-utils
# django-redis
# django-waffle
# djangorestframework
# edx-celeryutils
Expand All @@ -159,6 +164,8 @@ django-model-utils==4.4.0
# via
# -r requirements/quality.txt
# edx-celeryutils
django-redis==5.4.0
# via -r requirements/quality.txt
django-waffle==4.1.0
# via
# -r requirements/quality.txt
Expand Down Expand Up @@ -264,7 +271,7 @@ newrelic==9.6.0
# via
# -r requirements/quality.txt
# edx-django-utils
openedx-events==9.5.1
openedx-events==9.5.2
# via
# -r requirements/quality.txt
# event-tracking
Expand Down Expand Up @@ -363,7 +370,7 @@ pyproject-hooks==1.0.0
# -r requirements/pip-tools.txt
# build
# pip-tools
pytest==8.0.0
pytest==8.0.1
# via
# -r requirements/quality.txt
# pytest-cov
Expand Down Expand Up @@ -393,6 +400,10 @@ pyyaml==6.0.1
# -r requirements/quality.txt
# code-annotations
# edx-i18n-tools
redis==5.0.1
# via
# -r requirements/quality.txt
# django-redis
requests==2.31.0
# via
# -r requirements/quality.txt
Expand Down Expand Up @@ -441,7 +452,7 @@ tomlkit==0.12.3
# via
# -r requirements/quality.txt
# pylint
tox==4.12.1
tox==4.13.0
# via -r requirements/ci.txt
typing-extensions==4.9.0
# via
Expand All @@ -461,7 +472,7 @@ tzdata==2024.1
# -r requirements/quality.txt
# backports-zoneinfo
# celery
urllib3==2.2.0
urllib3==2.2.1
# via
# -r requirements/quality.txt
# requests
Expand Down
Loading

0 comments on commit 87d7252

Please sign in to comment.