Skip to content

Commit

Permalink
refactor: only send to event bus
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Oct 25, 2023
1 parent 7487ffc commit 39c2a5e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 28 deletions.
2 changes: 1 addition & 1 deletion event_routing_backends/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
# .. toggle_description: When True, the system will publish `TRACKING_EVENT_EMITTED` signals to the event bus. The
# `TRACKING_EVENT_EMITTED` signal is emit when a tracking log is emitted.
# .. toggle_use_cases: publish
SEND_TRACKING_EVENT_EMITTED_SIGNAL = SettingToggle('SEND_TRACKING_EVENT_EMITTED_SIGNAL', default=False, module_name=__name__)
SEND_TRACKING_EVENT_EMITTED_SIGNAL = SettingToggle('SEND_TRACKING_EVENT_EMITTED_SIGNAL', default=True, module_name=__name__)
35 changes: 8 additions & 27 deletions event_routing_backends/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

from django.conf import settings
from django.dispatch import receiver
from django_redis import get_redis_connection
from edx_django_utils.cache.utils import get_cache_key
from openedx_events.analytics.signals import TRACKING_EVENT_EMITTED
from openedx_events.event_bus import get_producer

Expand All @@ -19,30 +17,13 @@ def listen_for_tracking_event_emitted_event(sender, signal, **kwargs):
"""
Publish `TRACKING_EVENT_EMITTED` events to the event bus.
"""
event = kwargs['tracking_log']

redis = get_redis_connection("default")

queue_size = redis.llen(TRANSFORMED_EVENT_KEY_NAME)
if queue_size >= settings.EVENT_ROUTING_BATCH_SIZE:
queued_events = redis.rpop(TRANSFORMED_EVENT_KEY_NAME, settings.EVENT_ROUTING_BATCH_SIZE)
queued_events.append(event)
## TODO: Send events to event bus in batch
if SEND_TRACKING_EVENT_EMITTED_SIGNAL.is_enabled():
logger.info("Sending events to event bus in batch")

#if SEND_TRACKING_EVENT_EMITTED_SIGNAL.is_enabled():
# get_producer().send(
# signal=TRACKING_EVENT_EMITTED,
# topic='analytics',
# event_key_field='tracking_log.name',
# event_data={'tracking_log': kwargs['tracking_log']},
# event_metadata=kwargs['metadata']
# )
else:
redis.lpush(TRANSFORMED_EVENT_KEY_NAME, json.dumps({
"name": event.name,
"timestamp": event.timestamp,
"data": event.data,
"context": event.context,
}))
logger.info("Event pushed to the queue, current size: %s", queue_size + 1)
get_producer().send(
signal=TRACKING_EVENT_EMITTED,
topic='analytics',
event_key_field='tracking_log.name',
event_data={'tracking_log': kwargs['tracking_log']},
event_metadata=kwargs['metadata']
)

0 comments on commit 39c2a5e

Please sign in to comment.