Skip to content

Commit

Permalink
feat: add management command to consume event bus every N seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Feb 6, 2024
1 parent b2553c1 commit 5dc5338
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 19 deletions.
45 changes: 45 additions & 0 deletions event_routing_backends/management/commands/send_to_backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""
Management command for transforming tracking log files.
"""

import logging
import time

Check warning on line 6 in event_routing_backends/management/commands/send_to_backends.py

View check run for this annotation

Codecov / codecov/patch

event_routing_backends/management/commands/send_to_backends.py#L5-L6

Added lines #L5 - L6 were not covered by tests

from django.core.management.base import BaseCommand
from django_redis import get_redis_connection

Check warning on line 9 in event_routing_backends/management/commands/send_to_backends.py

View check run for this annotation

Codecov / codecov/patch

event_routing_backends/management/commands/send_to_backends.py#L8-L9

Added lines #L8 - L9 were not covered by tests

from event_routing_backends.handlers import TRANSFORMED_EVENT_KEY_NAME, send_batch

Check warning on line 11 in event_routing_backends/management/commands/send_to_backends.py

View check run for this annotation

Codecov / codecov/patch

event_routing_backends/management/commands/send_to_backends.py#L11

Added line #L11 was not covered by tests

logger = logging.getLogger(__name__)

Check warning on line 13 in event_routing_backends/management/commands/send_to_backends.py

View check run for this annotation

Codecov / codecov/patch

event_routing_backends/management/commands/send_to_backends.py#L13

Added line #L13 was not covered by tests


class Command(BaseCommand):

Check warning on line 16 in event_routing_backends/management/commands/send_to_backends.py

View check run for this annotation

Codecov / codecov/patch

event_routing_backends/management/commands/send_to_backends.py#L16

Added line #L16 was not covered by tests
"""
Transform tracking logs to an LRS or other output destination.
"""

def add_arguments(self, parser):

Check warning on line 21 in event_routing_backends/management/commands/send_to_backends.py

View check run for this annotation

Codecov / codecov/patch

event_routing_backends/management/commands/send_to_backends.py#L21

Added line #L21 was not covered by tests
"""
Add the necessary arguments to the command.
"""
parser.add_argument(

Check warning on line 25 in event_routing_backends/management/commands/send_to_backends.py

View check run for this annotation

Codecov / codecov/patch

event_routing_backends/management/commands/send_to_backends.py#L25

Added line #L25 was not covered by tests
"--wait_time",
type=int,
help="The number of seconds to wait between each batch send to the destination.",
required=True,
default=5,
)

def handle(self, *args, **options):

Check warning on line 33 in event_routing_backends/management/commands/send_to_backends.py

View check run for this annotation

Codecov / codecov/patch

event_routing_backends/management/commands/send_to_backends.py#L33

Added line #L33 was not covered by tests
"""
Configure the command and start the transform process.
"""
redis = get_redis_connection()

Check warning on line 37 in event_routing_backends/management/commands/send_to_backends.py

View check run for this annotation

Codecov / codecov/patch

event_routing_backends/management/commands/send_to_backends.py#L37

Added line #L37 was not covered by tests
while True:
queue_size = redis.llen(TRANSFORMED_EVENT_KEY_NAME)
batch = redis.rpop(TRANSFORMED_EVENT_KEY_NAME, queue_size)

Check warning on line 40 in event_routing_backends/management/commands/send_to_backends.py

View check run for this annotation

Codecov / codecov/patch

event_routing_backends/management/commands/send_to_backends.py#L39-L40

Added lines #L39 - L40 were not covered by tests
if batch:
send_batch(batch)
logger.info("Successfully sent batch to the destination.")
logger.info("Sleeping for %s seconds.", options["wait_time"])
time.sleep(options["wait_time"])

Check warning on line 45 in event_routing_backends/management/commands/send_to_backends.py

View check run for this annotation

Codecov / codecov/patch

event_routing_backends/management/commands/send_to_backends.py#L42-L45

Added lines #L42 - L45 were not covered by tests
2 changes: 1 addition & 1 deletion event_routing_backends/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,4 +224,4 @@ def plugin_settings(settings):
}
})

settings.EVENT_ROUTING_BATCH_SIZE = 5
settings.EVENT_ROUTING_BATCH_SIZE = 100
47 changes: 29 additions & 18 deletions event_routing_backends/tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
Test handlers for signals emitted by the analytics app
"""

import json
from datetime import datetime
from unittest.mock import Mock, patch

from django.test import TestCase
from django.test.utils import override_settings
from eventtracking.django.django_tracker import DjangoTracker
from openedx_events.analytics.data import TrackingLogData
import json
from event_routing_backends.handlers import send_tracking_log_to_backends, send_batch, add_to_batch

from event_routing_backends.handlers import add_to_batch, send_batch, send_tracking_log_to_backends


class TestHandlers(TestCase):
"""
Expand All @@ -29,7 +31,13 @@ class TestHandlers(TestCase):
@patch("event_routing_backends.handlers.send_batch")
@patch("event_routing_backends.handlers.add_to_batch")
@patch("event_routing_backends.handlers.get_redis_connection")
def test_send_tracking_log_to_batch(self, mock_get_redis_connection, mock_add_to_batch, mock_send_batch, mock_get_tracker):
def test_send_tracking_log_to_batch(
self,
mock_get_redis_connection,
mock_add_to_batch,
mock_send_batch,
mock_get_tracker,
):
"""
Test for send_tracking_log_to_backends
"""
Expand All @@ -51,12 +59,15 @@ def test_send_tracking_log_to_batch(self, mock_get_redis_connection, mock_add_to
signal=None,
tracking_log=tracking_log_data,
)
mock_add_to_batch.assert_called_once_with(mock_get_redis_connection.return_value, {
"name": tracking_log_data.name,
"timestamp": tracking_log_data.timestamp.isoformat(),
"data": json.loads(tracking_log_data.data),
"context": json.loads(tracking_log_data.context),
})
mock_add_to_batch.assert_called_once_with(
mock_get_redis_connection.return_value,
{
"name": tracking_log_data.name,
"timestamp": tracking_log_data.timestamp.isoformat(),
"data": json.loads(tracking_log_data.data),
"context": json.loads(tracking_log_data.context),
},
)

mock_send_batch.assert_called_once()

Expand All @@ -68,26 +79,22 @@ def test_send_tracking_log_to_batch(self, mock_get_redis_connection, mock_add_to
"processors": [
{
"ENGINE": "eventtracking.processors.whitelist.NameWhitelistProcessor",
"OPTIONS": {
"whitelist": ["no_test_name"]
}
"OPTIONS": {"whitelist": ["no_test_name"]},
}
],
"backends": {
"xapi": {
"ENGINE": "event_routing_backends.backends.sync_events_router.SyncEventsRouter",
},
}
},
},
},
}
)
@patch("event_routing_backends.handlers.get_tracker")
@patch("event_routing_backends.handlers.isinstance")
@patch("event_routing_backends.handlers.logger")
def test_send_batch(
self, mock_logger, mock_is_instance, mock_get_tracker
):
def test_send_batch(self, mock_logger, mock_is_instance, mock_get_tracker):
"""
Test for send_batch
"""
Expand All @@ -106,7 +113,9 @@ def test_send_batch(
}
send_batch([json.dumps(event).encode("utf-8")])

mock_logger.info.assert_called_once_with("Sending events to backend [xapi] event bus in batch")
mock_logger.info.assert_called_once_with(
"Sending events to backend [xapi] event bus in batch"
)
mock_bulk_send.assert_called_once_with([event])

def test_add_to_batch(self):
Expand All @@ -121,4 +130,6 @@ def test_add_to_batch(self):
"context": {},
}
add_to_batch(mock_redis, event)
mock_redis.lpush.assert_called_once_with("transformed_events", json.dumps(event))
mock_redis.lpush.assert_called_once_with(
"transformed_events", json.dumps(event)
)

0 comments on commit 5dc5338

Please sign in to comment.