diff --git a/event_routing_backends/management/commands/send_to_backends.py b/event_routing_backends/management/commands/send_to_backends.py new file mode 100644 index 00000000..4cd47a68 --- /dev/null +++ b/event_routing_backends/management/commands/send_to_backends.py @@ -0,0 +1,45 @@ +""" +Management command for transforming tracking log files. +""" + +import logging +import time + +from django.core.management.base import BaseCommand +from django_redis import get_redis_connection + +from event_routing_backends.handlers import TRANSFORMED_EVENT_KEY_NAME, send_batch + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + """ + Transform tracking logs to an LRS or other output destination. + """ + + def add_arguments(self, parser): + """ + Add the necessary arguments to the command. + """ + parser.add_argument( + "--wait_time", + type=int, + help="The number of seconds to wait between each batch send to the destination.", + required=True, + default=5, + ) + + def handle(self, *args, **options): + """ + Configure the command and start the transform process. + """ + redis = get_redis_connection() + while True: + queue_size = redis.llen(TRANSFORMED_EVENT_KEY_NAME) + batch = redis.rpop(TRANSFORMED_EVENT_KEY_NAME, queue_size) + if batch: + send_batch(batch) + logger.info("Successfully sent batch to the destination.") + logger.info("Sleeping for %s seconds.", options["wait_time"]) + time.sleep(options["wait_time"]) diff --git a/event_routing_backends/settings/common.py b/event_routing_backends/settings/common.py index c2bdf4ed..31469b48 100644 --- a/event_routing_backends/settings/common.py +++ b/event_routing_backends/settings/common.py @@ -224,4 +224,4 @@ def plugin_settings(settings): } }) - settings.EVENT_ROUTING_BATCH_SIZE = 5 + settings.EVENT_ROUTING_BATCH_SIZE = 100 diff --git a/event_routing_backends/tests/test_handlers.py b/event_routing_backends/tests/test_handlers.py index 9fff4c2b..86c09f91 100644 --- a/event_routing_backends/tests/test_handlers.py +++ b/event_routing_backends/tests/test_handlers.py @@ -2,6 +2,7 @@ Test handlers for signals emitted by the analytics app """ +import json from datetime import datetime from unittest.mock import Mock, patch @@ -9,8 +10,9 @@ from django.test.utils import override_settings from eventtracking.django.django_tracker import DjangoTracker from openedx_events.analytics.data import TrackingLogData -import json -from event_routing_backends.handlers import send_tracking_log_to_backends, send_batch, add_to_batch + +from event_routing_backends.handlers import add_to_batch, send_batch, send_tracking_log_to_backends + class TestHandlers(TestCase): """ @@ -29,7 +31,13 @@ class TestHandlers(TestCase): @patch("event_routing_backends.handlers.send_batch") @patch("event_routing_backends.handlers.add_to_batch") @patch("event_routing_backends.handlers.get_redis_connection") - def test_send_tracking_log_to_batch(self, mock_get_redis_connection, mock_add_to_batch, mock_send_batch, mock_get_tracker): + def test_send_tracking_log_to_batch( + self, + mock_get_redis_connection, + mock_add_to_batch, + mock_send_batch, + mock_get_tracker, + ): """ Test for send_tracking_log_to_backends """ @@ -51,12 +59,15 @@ def test_send_tracking_log_to_batch(self, mock_get_redis_connection, mock_add_to signal=None, tracking_log=tracking_log_data, ) - mock_add_to_batch.assert_called_once_with(mock_get_redis_connection.return_value, { - "name": tracking_log_data.name, - "timestamp": tracking_log_data.timestamp.isoformat(), - "data": json.loads(tracking_log_data.data), - "context": json.loads(tracking_log_data.context), - }) + mock_add_to_batch.assert_called_once_with( + mock_get_redis_connection.return_value, + { + "name": tracking_log_data.name, + "timestamp": tracking_log_data.timestamp.isoformat(), + "data": json.loads(tracking_log_data.data), + "context": json.loads(tracking_log_data.context), + }, + ) mock_send_batch.assert_called_once() @@ -68,16 +79,14 @@ def test_send_tracking_log_to_batch(self, mock_get_redis_connection, mock_add_to "processors": [ { "ENGINE": "eventtracking.processors.whitelist.NameWhitelistProcessor", - "OPTIONS": { - "whitelist": ["no_test_name"] - } + "OPTIONS": {"whitelist": ["no_test_name"]}, } ], "backends": { "xapi": { "ENGINE": "event_routing_backends.backends.sync_events_router.SyncEventsRouter", }, - } + }, }, }, } @@ -85,9 +94,7 @@ def test_send_tracking_log_to_batch(self, mock_get_redis_connection, mock_add_to @patch("event_routing_backends.handlers.get_tracker") @patch("event_routing_backends.handlers.isinstance") @patch("event_routing_backends.handlers.logger") - def test_send_batch( - self, mock_logger, mock_is_instance, mock_get_tracker - ): + def test_send_batch(self, mock_logger, mock_is_instance, mock_get_tracker): """ Test for send_batch """ @@ -106,7 +113,9 @@ def test_send_batch( } send_batch([json.dumps(event).encode("utf-8")]) - mock_logger.info.assert_called_once_with("Sending events to backend [xapi] event bus in batch") + mock_logger.info.assert_called_once_with( + "Sending events to backend [xapi] event bus in batch" + ) mock_bulk_send.assert_called_once_with([event]) def test_add_to_batch(self): @@ -121,4 +130,6 @@ def test_add_to_batch(self): "context": {}, } add_to_batch(mock_redis, event) - mock_redis.lpush.assert_called_once_with("transformed_events", json.dumps(event)) + mock_redis.lpush.assert_called_once_with( + "transformed_events", json.dumps(event) + )