Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add listener for tracking event emitted signal #305

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions event_routing_backends/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ def ready(self):
"""
super().ready()
# pylint: disable=import-outside-toplevel, unused-import
from event_routing_backends import signals # noqa: F401
from event_routing_backends.processors.caliper import event_transformers as caliper_event_transformers
from event_routing_backends.processors.xapi import event_transformers as xapi_event_transformers
14 changes: 14 additions & 0 deletions event_routing_backends/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""
This module contains various configuration settings via
waffle switches for the Certificates app.
"""

from edx_toggles.toggles import SettingToggle, WaffleSwitch

# .. toggle_name: SEND_TRACKING_EVENT_EMITTED_SIGNAL
# .. toggle_implementation: SettingToggle
# .. toggle_default: False
# .. toggle_description: When True, the system will publish `TRACKING_EVENT_EMITTED` signals to the event bus. The
# `TRACKING_EVENT_EMITTED` signal is emit when a tracking log is emitted.
# .. toggle_use_cases: publish
SEND_TRACKING_EVENT_EMITTED_SIGNAL = SettingToggle('SEND_TRACKING_EVENT_EMITTED_SIGNAL', default=False, module_name=__name__)
1 change: 1 addition & 0 deletions event_routing_backends/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def plugin_settings(settings):
settings.XAPI_EVENTS_ENABLED = True
settings.EVENT_ROUTING_BACKEND_MAX_RETRIES = 3
settings.EVENT_ROUTING_BACKEND_COUNTDOWN = 30
settings.EVENT_ROUTING_BATCH_SIZE = 5

# .. setting_name: XAPI_AGENT_IFI_TYPE
# .. setting_default: 'external_id'
Expand Down
50 changes: 50 additions & 0 deletions event_routing_backends/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import json
import logging

from django.conf import settings
from django.dispatch import receiver
from django_redis import get_redis_connection
from edx_django_utils.cache.utils import get_cache_key
from openedx_events.analytics.signals import TRACKING_EVENT_EMITTED
from openedx_events.event_bus import get_producer

from event_routing_backends.config import SEND_TRACKING_EVENT_EMITTED_SIGNAL

logger = logging.getLogger(__name__)

TRANSFORMED_EVENT_KEY_NAME = "transformed_events"

@receiver(TRACKING_EVENT_EMITTED)
def listen_for_tracking_event_emitted_event(sender, signal, **kwargs):
"""
Publish `TRACKING_EVENT_EMITTED` events to the event bus.
"""
event = kwargs['tracking_log']

## TODO: Should we ignore events that we don't care about here or in the event routing backend config?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bmtcril Please read my comments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible to ignore events at the tracking log configuration level, which may be enough for this and is probably better for overall system performance.


redis = get_redis_connection("default")

queue_size = redis.llen(TRANSFORMED_EVENT_KEY_NAME)
if queue_size >= settings.EVENT_ROUTING_BATCH_SIZE:
queued_events = redis.rpop(TRANSFORMED_EVENT_KEY_NAME, settings.EVENT_ROUTING_BATCH_SIZE)
queued_events.append(event)
## TODO: Send events to event bus in batch
logger.info("Sending events to event bus in batch")

#if SEND_TRACKING_EVENT_EMITTED_SIGNAL.is_enabled():
# get_producer().send(
# signal=TRACKING_EVENT_EMITTED,
# topic='analytics',
# event_key_field='tracking_log.name',
# event_data={'tracking_log': kwargs['tracking_log']},
# event_metadata=kwargs['metadata']
# )
Comment on lines +35 to +42
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bmtcril I guess we would need to write the code for the event-bus consumer, but how will we send queued events in batch to the event bus?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any additional code needed for the consumer. Running the consume_events command inside a configured edx-platform container will read events off the event bus and fire off this signal for events that match. The batching capability is in my current PR: https://github.com/openedx/event-routing-backends/pull/301/files#diff-f24cf45168ff0c054647f2bf8c504fef13b9acf37fd97eedfd7410cc61c06458R112

You would need to create an EventsRouter with the processors defined in configuration just like the AsyncRouter does (note: not like the management command does, it does not use the processors). Then you can call bulk_send on that and it will find all of the configured LRSs and POST the batch to them.

else:
redis.lpush(TRANSFORMED_EVENT_KEY_NAME, json.dumps({
"name": event.name,
"timestamp": event.timestamp,
"data": event.data,
"context": event.context,
}))
logger.info("Event pushed to the queue, current size: %s", queue_size + 1)