From 2d4c172b37153663d0b428f6df0d3423095f684f Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Wed, 21 Feb 2024 15:01:11 -0500 Subject: [PATCH] feat: send events in batch from routers --- .../backends/events_router.py | 34 ++++++++++++++- .../backends/tests/test_events_router.py | 43 ++++++++++++++++++- event_routing_backends/settings/common.py | 3 +- event_routing_backends/settings/production.py | 8 ++++ requirements/base.in | 1 + requirements/base.txt | 13 ++++-- requirements/ci.txt | 2 +- requirements/dev.txt | 23 +++++++--- requirements/doc.txt | 21 ++++++--- requirements/quality.txt | 21 ++++++--- requirements/test.txt | 21 ++++++--- test_settings.py | 2 + 12 files changed, 164 insertions(+), 28 deletions(-) diff --git a/event_routing_backends/backends/events_router.py b/event_routing_backends/backends/events_router.py index 85832f4d..89cf7d77 100644 --- a/event_routing_backends/backends/events_router.py +++ b/event_routing_backends/backends/events_router.py @@ -1,6 +1,7 @@ """ Generic router to send events to hosts. """ +import json import logging from eventtracking.processors.exceptions import EventEmissionExit @@ -8,9 +9,14 @@ from event_routing_backends.helpers import get_business_critical_events from event_routing_backends.models import RouterConfiguration +from django_redis import get_redis_connection +from django.conf import settings + logger = logging.getLogger(__name__) +EVENTS_ROUTER_QUEUE_FORMAT = 'events_router_queue_{}' + class EventsRouter: """ Router to send events to hosts using requests library. @@ -26,6 +32,7 @@ def __init__(self, processors=None, backend_name=None): """ self.processors = processors if processors else [] self.backend_name = backend_name + self.queue_name = EVENTS_ROUTER_QUEUE_FORMAT.format(self.backend_name) def configure_host(self, host, router): """ @@ -154,8 +161,14 @@ def send(self, event): Arguments: event (dict): the original event dictionary """ - event_routes = self.prepare_to_send([event]) + if settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED: + events = self.queue_event(event) + if not events: + return + + return self.bulk_send(events) + event_routes = self.prepare_to_send([event]) for events_for_route in event_routes.values(): for event_name, updated_event, host, is_business_critical in events_for_route: if is_business_critical: @@ -173,6 +186,25 @@ def send(self, event): host['host_configurations'], ) + def queue_event(self, event): + """ + Queue the event to be sent to configured routers. + + """ + redis = get_redis_connection() + + event["timestamp"] = event["timestamp"].isoformat() + redis.lpush(self.queue_name, json.dumps(event)) + queue_size = redis.llen(self.queue_name) + + logger.info(f'Event {event["name"]} has been queued for batching.') + if queue_size < settings.EVENT_ROUTING_BACKEND_BATCH_SIZE: + return None + batch = redis.rpop(self.queue_name, queue_size) + events = [json.loads(queued_event.decode('utf-8')) for queued_event in batch] + return events + + def process_event(self, event): """ Process the event through this router's processors. diff --git a/event_routing_backends/backends/tests/test_events_router.py b/event_routing_backends/backends/tests/test_events_router.py index 45cc35ff..635e7089 100644 --- a/event_routing_backends/backends/tests/test_events_router.py +++ b/event_routing_backends/backends/tests/test_events_router.py @@ -1,11 +1,14 @@ """ Test the EventsRouter """ +import json +import datetime +from copy import copy from unittest.mock import MagicMock, call, patch, sentinel import ddt from django.conf import settings -from django.test import TestCase +from django.test import TestCase, override_settings from edx_django_utils.cache.utils import TieredCache from eventtracking.processors.exceptions import EventEmissionExit from tincan.statement import Statement @@ -257,6 +260,44 @@ def test_duplicate_xapi_event_id(self, mocked_logger): mocked_logger.info.mock_calls ) + @override_settings( + EVENT_ROUTING_BACKEND_BATCHING_ENABLED=True, + EVENT_ROUTING_BACKEND_BATCH_SIZE=2 + ) + @patch('event_routing_backends.backends.events_router.get_redis_connection') + @patch('event_routing_backends.backends.events_router.logger') + @patch('event_routing_backends.backends.events_router.EventsRouter.bulk_send') + def test_queue_event(self, mock_bulk_send, mock_logger, mock_get_redis_connection): + router = EventsRouter(processors=[], backend_name='test') + redis_mock = MagicMock() + mock_get_redis_connection.return_value = redis_mock + redis_mock.lpush.return_value = None + event1 = copy(self.transformed_event) + event1["timestamp"] = datetime.datetime.now() + event2 = copy(self.transformed_event) + event2["timestamp"] = datetime.datetime.now() + events = [event1, event2] + formatted_events = [] + for event in events: + formatted_event = copy(event) + formatted_event["timestamp"] = formatted_event["timestamp"].isoformat() + formatted_events.append(json.dumps(formatted_event).encode('utf-8')) + + redis_mock.rpop.return_value = formatted_events + redis_mock.llen.return_value = 1 + + router.send(event1) + redis_mock.llen.return_value = 2 + router.send(event2) + + redis_mock.lpush.assert_any_call(router.queue_name, json.dumps(event1)) + redis_mock.llen.assert_any_call(router.queue_name) + redis_mock.rpop.assert_any_call(router.queue_name, settings.EVENT_ROUTING_BACKEND_BATCH_SIZE) + mock_logger.info.assert_any_call( + f"Event {self.transformed_event['name']} has been queued for batching." + ) + mock_bulk_send.assert_any_call(events) + @ddt.ddt class TestAsyncEventsRouter(TestEventsRouter): # pylint: disable=test-inherits-tests diff --git a/event_routing_backends/settings/common.py b/event_routing_backends/settings/common.py index d859e887..7042e41e 100644 --- a/event_routing_backends/settings/common.py +++ b/event_routing_backends/settings/common.py @@ -15,7 +15,8 @@ def plugin_settings(settings): settings.EVENT_ROUTING_BACKEND_COUNTDOWN = 30 settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_MAX_RETRIES = 3 settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_COUNTDOWN = 1 - + settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = True + settings.EVENT_ROUTING_BACKEND_BATCH_SIZE = 10 # .. setting_name: XAPI_AGENT_IFI_TYPE # .. setting_default: 'external_id' # .. setting_description: This setting can be used to specify the type of inverse functional identifier diff --git a/event_routing_backends/settings/production.py b/event_routing_backends/settings/production.py index 2cd9e926..3c84fdd4 100644 --- a/event_routing_backends/settings/production.py +++ b/event_routing_backends/settings/production.py @@ -15,6 +15,14 @@ def plugin_settings(settings): 'EVENT_ROUTING_BACKEND_COUNTDOWN', settings.EVENT_ROUTING_BACKEND_COUNTDOWN ) + settings.EVENT_ROUTING_BACKEND_BATCH_SIZE = settings.ENV_TOKENS.get( + 'EVENT_ROUTING_BACKEND_BATCH_SIZE', + settings.EVENT_ROUTING_BACKEND_BATCH_SIZE + ) + settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = settings.ENV_TOKENS.get( + 'EVENT_ROUTING_BACKEND_BATCHING_ENABLED', + settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED + ) settings.CALIPER_EVENTS_ENABLED = settings.ENV_TOKENS.get( 'CALIPER_EVENTS_ENABLED', settings.CALIPER_EVENTS_ENABLED diff --git a/requirements/base.in b/requirements/base.in index f9fdf05e..96c2d8a3 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -16,3 +16,4 @@ edx-celeryutils apache-libcloud # For bulk event log loading fasteners # Locking tools, required by apache-libcloud, but somehow not installed with it openedx-filters +django-redis diff --git a/requirements/base.txt b/requirements/base.txt index 231a8394..9cc87233 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -12,6 +12,8 @@ apache-libcloud==3.8.0 # via -r requirements/base.in asgiref==3.7.2 # via django +async-timeout==4.0.3 + # via redis attrs==23.2.0 # via openedx-events backports-zoneinfo[tzdata]==0.2.1 @@ -48,7 +50,7 @@ click-repl==0.3.0 # via celery code-annotations==1.6.0 # via edx-toggles -cryptography==42.0.3 +cryptography==42.0.4 # via django-fernet-fields-v2 django==3.2.24 # via @@ -58,6 +60,7 @@ django==3.2.24 # django-crum # django-fernet-fields-v2 # django-model-utils + # django-redis # django-waffle # djangorestframework # edx-celeryutils @@ -77,6 +80,8 @@ django-fernet-fields-v2==0.9 # via -r requirements/base.in django-model-utils==4.4.0 # via edx-celeryutils +django-redis==5.4.0 + # via -r requirements/base.in django-waffle==4.1.0 # via # edx-django-utils @@ -119,7 +124,7 @@ markupsafe==2.1.5 # via jinja2 newrelic==9.6.0 # via edx-django-utils -openedx-events==9.5.1 +openedx-events==9.5.2 # via event-tracking openedx-filters==1.6.0 # via -r requirements/base.in @@ -152,6 +157,8 @@ pytz==2024.1 # tincan pyyaml==6.0.1 # via code-annotations +redis==5.0.1 + # via django-redis requests==2.31.0 # via # -r requirements/base.in @@ -181,7 +188,7 @@ tzdata==2024.1 # via # backports-zoneinfo # celery -urllib3==2.2.0 +urllib3==2.2.1 # via requests vine==5.1.0 # via diff --git a/requirements/ci.txt b/requirements/ci.txt index 742e39c1..bd57292b 100644 --- a/requirements/ci.txt +++ b/requirements/ci.txt @@ -32,7 +32,7 @@ tomli==2.0.1 # via # pyproject-api # tox -tox==4.12.1 +tox==4.13.0 # via -r requirements/ci.in virtualenv==20.25.0 # via tox diff --git a/requirements/dev.txt b/requirements/dev.txt index c7207bb8..b6303e42 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -25,6 +25,10 @@ astroid==3.0.3 # -r requirements/quality.txt # pylint # pylint-celery +async-timeout==4.0.3 + # via + # -r requirements/quality.txt + # redis attrs==23.2.0 # via # -r requirements/quality.txt @@ -106,11 +110,11 @@ colorama==0.4.6 # via # -r requirements/ci.txt # tox -coverage[toml]==7.4.1 +coverage[toml]==7.4.2 # via # -r requirements/quality.txt # pytest-cov -cryptography==42.0.3 +cryptography==42.0.4 # via # -r requirements/quality.txt # django-fernet-fields-v2 @@ -136,6 +140,7 @@ django==3.2.24 # django-crum # django-fernet-fields-v2 # django-model-utils + # django-redis # django-waffle # djangorestframework # edx-celeryutils @@ -159,6 +164,8 @@ django-model-utils==4.4.0 # via # -r requirements/quality.txt # edx-celeryutils +django-redis==5.4.0 + # via -r requirements/quality.txt django-waffle==4.1.0 # via # -r requirements/quality.txt @@ -264,7 +271,7 @@ newrelic==9.6.0 # via # -r requirements/quality.txt # edx-django-utils -openedx-events==9.5.1 +openedx-events==9.5.2 # via # -r requirements/quality.txt # event-tracking @@ -363,7 +370,7 @@ pyproject-hooks==1.0.0 # -r requirements/pip-tools.txt # build # pip-tools -pytest==8.0.0 +pytest==8.0.1 # via # -r requirements/quality.txt # pytest-cov @@ -393,6 +400,10 @@ pyyaml==6.0.1 # -r requirements/quality.txt # code-annotations # edx-i18n-tools +redis==5.0.1 + # via + # -r requirements/quality.txt + # django-redis requests==2.31.0 # via # -r requirements/quality.txt @@ -441,7 +452,7 @@ tomlkit==0.12.3 # via # -r requirements/quality.txt # pylint -tox==4.12.1 +tox==4.13.0 # via -r requirements/ci.txt typing-extensions==4.9.0 # via @@ -461,7 +472,7 @@ tzdata==2024.1 # -r requirements/quality.txt # backports-zoneinfo # celery -urllib3==2.2.0 +urllib3==2.2.1 # via # -r requirements/quality.txt # requests diff --git a/requirements/doc.txt b/requirements/doc.txt index 2513b488..e8b5ec94 100644 --- a/requirements/doc.txt +++ b/requirements/doc.txt @@ -22,6 +22,10 @@ asgiref==3.7.2 # via # -r requirements/test.txt # django +async-timeout==4.0.3 + # via + # -r requirements/test.txt + # redis attrs==23.2.0 # via # -r requirements/test.txt @@ -86,11 +90,11 @@ code-annotations==1.6.0 # via # -r requirements/test.txt # edx-toggles -coverage[toml]==7.4.1 +coverage[toml]==7.4.2 # via # -r requirements/test.txt # pytest-cov -cryptography==42.0.3 +cryptography==42.0.4 # via # -r requirements/test.txt # django-fernet-fields-v2 @@ -105,6 +109,7 @@ django==3.2.24 # django-crum # django-fernet-fields-v2 # django-model-utils + # django-redis # django-waffle # djangorestframework # edx-celeryutils @@ -127,6 +132,8 @@ django-model-utils==4.4.0 # via # -r requirements/test.txt # edx-celeryutils +django-redis==5.4.0 + # via -r requirements/test.txt django-waffle==4.1.0 # via # -r requirements/test.txt @@ -241,7 +248,7 @@ newrelic==9.6.0 # edx-django-utils nh3==0.2.15 # via readme-renderer -openedx-events==9.5.1 +openedx-events==9.5.2 # via # -r requirements/test.txt # event-tracking @@ -297,7 +304,7 @@ pynacl==1.5.0 # edx-django-utils pyproject-hooks==1.0.0 # via build -pytest==8.0.0 +pytest==8.0.1 # via # -r requirements/test.txt # pytest-cov @@ -329,6 +336,10 @@ pyyaml==6.0.1 # code-annotations readme-renderer==42.0 # via twine +redis==5.0.1 + # via + # -r requirements/test.txt + # django-redis requests==2.31.0 # via # -r requirements/test.txt @@ -416,7 +427,7 @@ tzdata==2024.1 # -r requirements/test.txt # backports-zoneinfo # celery -urllib3==2.2.0 +urllib3==2.2.1 # via # -r requirements/test.txt # requests diff --git a/requirements/quality.txt b/requirements/quality.txt index f665ed61..b4c331a4 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -22,6 +22,10 @@ astroid==3.0.3 # via # pylint # pylint-celery +async-timeout==4.0.3 + # via + # -r requirements/test.txt + # redis attrs==23.2.0 # via # -r requirements/test.txt @@ -83,11 +87,11 @@ code-annotations==1.6.0 # -r requirements/test.txt # edx-lint # edx-toggles -coverage[toml]==7.4.1 +coverage[toml]==7.4.2 # via # -r requirements/test.txt # pytest-cov -cryptography==42.0.3 +cryptography==42.0.4 # via # -r requirements/test.txt # django-fernet-fields-v2 @@ -103,6 +107,7 @@ django==3.2.24 # django-crum # django-fernet-fields-v2 # django-model-utils + # django-redis # django-waffle # djangorestframework # edx-celeryutils @@ -125,6 +130,8 @@ django-model-utils==4.4.0 # via # -r requirements/test.txt # edx-celeryutils +django-redis==5.4.0 + # via -r requirements/test.txt django-waffle==4.1.0 # via # -r requirements/test.txt @@ -209,7 +216,7 @@ newrelic==9.6.0 # via # -r requirements/test.txt # edx-django-utils -openedx-events==9.5.1 +openedx-events==9.5.2 # via # -r requirements/test.txt # event-tracking @@ -268,7 +275,7 @@ pynacl==1.5.0 # via # -r requirements/test.txt # edx-django-utils -pytest==8.0.0 +pytest==8.0.1 # via # -r requirements/test.txt # pytest-cov @@ -297,6 +304,10 @@ pyyaml==6.0.1 # via # -r requirements/test.txt # code-annotations +redis==5.0.1 + # via + # -r requirements/test.txt + # django-redis requests==2.31.0 # via # -r requirements/test.txt @@ -348,7 +359,7 @@ tzdata==2024.1 # -r requirements/test.txt # backports-zoneinfo # celery -urllib3==2.2.0 +urllib3==2.2.1 # via # -r requirements/test.txt # requests diff --git a/requirements/test.txt b/requirements/test.txt index d87bcc3f..c6e0bed6 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -18,6 +18,10 @@ asgiref==3.7.2 # via # -r requirements/base.txt # django +async-timeout==4.0.3 + # via + # -r requirements/base.txt + # redis attrs==23.2.0 # via # -r requirements/base.txt @@ -75,9 +79,9 @@ code-annotations==1.6.0 # -r requirements/base.txt # -r requirements/test.in # edx-toggles -coverage[toml]==7.4.1 +coverage[toml]==7.4.2 # via pytest-cov -cryptography==42.0.3 +cryptography==42.0.4 # via # -r requirements/base.txt # django-fernet-fields-v2 @@ -90,6 +94,7 @@ ddt==1.7.1 # django-crum # django-fernet-fields-v2 # django-model-utils + # django-redis # django-waffle # djangorestframework # edx-celeryutils @@ -112,6 +117,8 @@ django-model-utils==4.4.0 # via # -r requirements/base.txt # edx-celeryutils +django-redis==5.4.0 + # via -r requirements/base.txt django-waffle==4.1.0 # via # -r requirements/base.txt @@ -182,7 +189,7 @@ newrelic==9.6.0 # via # -r requirements/base.txt # edx-django-utils -openedx-events==9.5.1 +openedx-events==9.5.2 # via # -r requirements/base.txt # event-tracking @@ -217,7 +224,7 @@ pynacl==1.5.0 # via # -r requirements/base.txt # edx-django-utils -pytest==8.0.0 +pytest==8.0.1 # via # pytest-cov # pytest-django @@ -245,6 +252,10 @@ pyyaml==6.0.1 # via # -r requirements/base.txt # code-annotations +redis==5.0.1 + # via + # -r requirements/base.txt + # django-redis requests==2.31.0 # via # -r requirements/base.txt @@ -287,7 +298,7 @@ tzdata==2024.1 # -r requirements/base.txt # backports-zoneinfo # celery -urllib3==2.2.0 +urllib3==2.2.1 # via # -r requirements/base.txt # requests diff --git a/test_settings.py b/test_settings.py index 23efece7..34b2b797 100644 --- a/test_settings.py +++ b/test_settings.py @@ -47,5 +47,7 @@ def root(*args): RUNNING_WITH_TEST_SETTINGS = True EVENT_TRACKING_BACKENDS = {} XAPI_AGENT_IFI_TYPE = 'external_id' +EVENT_ROUTING_BACKEND_BATCHING_ENABLED = False +EVENT_ROUTING_BACKEND_BATCH_SIZE = 1 _mock_third_party_modules()