Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Batching failures when individual event errors bubble up #408

Merged
merged 4 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion event_routing_backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Various backends for receiving edX LMS events..
"""

__version__ = '8.3.0'
__version__ = '8.3.1'
15 changes: 14 additions & 1 deletion event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,17 @@ def queue_event(self, redis, event):

if queue_size >= settings.EVENT_ROUTING_BACKEND_BATCH_SIZE or self.time_to_send(redis):
batch = redis.rpop(self.queue_name, queue_size)

orig_size = len(batch)
# Deduplicate list, in some misconfigured cases tracking events can be emitted to the
# bus twice, causing them to be processed twice, which LRSs will reject.
# See: https://github.com/openedx/event-routing-backends/issues/410
batch = [i for n, i in enumerate(batch) if i not in batch[n + 1:]]
final_size = len(batch)

if final_size != orig_size: # pragma: no cover
logger.warning(f"{orig_size - final_size} duplicate events in event-routing-backends batch queue! "
f"This is a likely due to misconfiguration of EVENT_TRACKING_BACKENDS.")
return batch

return None
Expand All @@ -237,7 +248,9 @@ def time_to_send(self, redis):
if not last_sent:
return True
time_passed = (datetime.now() - datetime.fromisoformat(last_sent.decode('utf-8')))
return time_passed > timedelta(seconds=settings.EVENT_ROUTING_BACKEND_BATCH_INTERVAL)
ready = time_passed > timedelta(seconds=settings.EVENT_ROUTING_BACKEND_BATCH_INTERVAL)

return ready

def process_event(self, event):
"""
Expand Down
10 changes: 3 additions & 7 deletions event_routing_backends/processors/caliper/tests/test_caliper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from django.test import SimpleTestCase
from django.test.utils import override_settings
from eventtracking.processors.exceptions import EventEmissionExit, NoBackendEnabled
from mock import MagicMock, call, patch, sentinel

from event_routing_backends.processors.caliper.transformer_processor import CaliperProcessor
Expand All @@ -29,13 +28,11 @@ def setUp(self):

@override_settings(CALIPER_EVENTS_ENABLED=False)
def test_skip_event_when_disabled(self):
with self.assertRaises(NoBackendEnabled):
self.processor(self.sample_event)
self.assertFalse(self.processor(self.sample_event))

@patch('event_routing_backends.processors.mixins.base_transformer_processor.logger')
def test_send_method_with_no_transformer_implemented(self, mocked_logger):
with self.assertRaises(EventEmissionExit):
self.processor([self.sample_event])
self.assertFalse(self.processor([self.sample_event]))

mocked_logger.error.assert_called_once_with(
'Could not get transformer for %s event.',
Expand Down Expand Up @@ -130,6 +127,5 @@ def test_send_method_with_successfull_flow_logging_disabled(
def test_with_no_registry(self, mocked_logger):
backend = CaliperProcessor()
backend.registry = None
with self.assertRaises(EventEmissionExit):
self.assertIsNone(backend([self.sample_event]))
self.assertFalse(backend([self.sample_event]))
mocked_logger.exception.assert_called_once()
4 changes: 1 addition & 3 deletions event_routing_backends/processors/mixins/base_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,7 @@ def get_data(self, key, required=False):
result = None

if result is None:
if not required:
logger.warning('Could not get value for %s in event "%s"', key, self.event.get('name', None))
else:
Comment on lines -178 to -180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great change here

if required:
raise ValueError(
'Could not get value for {} in event "{}"'.format(key, self.event.get('name', None))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
from logging import getLogger

from eventtracking.processors.exceptions import EventEmissionExit, NoTransformerImplemented
from eventtracking.processors.exceptions import NoBackendEnabled, NoTransformerImplemented

logger = getLogger(__name__)

Expand Down Expand Up @@ -34,13 +34,18 @@ def __call__(self, events):
"""
returned_events = []
for event in events:
transformed_event = self.transform_event(event)
if not transformed_event:
raise EventEmissionExit
if isinstance(transformed_event, list):
returned_events += transformed_event
else:
returned_events.append(transformed_event)
try:
transformed_event = self.transform_event(event)
if not transformed_event:
pass
elif isinstance(transformed_event, list):
returned_events += transformed_event
else:
returned_events.append(transformed_event)

# If the backend isn't enabled at all, early out
except NoBackendEnabled:
break
return returned_events

def transform_event(self, event):
Expand All @@ -60,7 +65,6 @@ def transform_event(self, event):

try:
transformed_event = self.get_transformed_event(event)

except NoTransformerImplemented:
logger.error('Could not get transformer for %s event.', event_name)
return None
Expand Down
14 changes: 4 additions & 10 deletions event_routing_backends/processors/xapi/tests/test_xapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from django.test import SimpleTestCase
from django.test.utils import override_settings
from eventtracking.processors.exceptions import EventEmissionExit, NoBackendEnabled
from mock import MagicMock, call, patch, sentinel
from tincan import Activity, Statement

Expand All @@ -25,13 +24,11 @@ def setUp(self):

@override_settings(XAPI_EVENTS_ENABLED=False)
def test_skip_event_when_disabled(self):
with self.assertRaises(NoBackendEnabled):
self.processor(self.sample_event)
self.assertFalse(self.processor(self.sample_event))

@patch('event_routing_backends.processors.mixins.base_transformer_processor.logger')
def test_send_method_with_no_transformer_implemented(self, mocked_logger):
with self.assertRaises(EventEmissionExit):
self.processor([self.sample_event])
self.assertFalse(self.processor([self.sample_event]))

mocked_logger.error.assert_called_once_with(
'Could not get transformer for %s event.',
Expand Down Expand Up @@ -91,9 +88,7 @@ def test_send_method_with_invalid_object(self, mocked_logger, mocked_get_transfo
mocked_transformer.transform.return_value = transformed_event
mocked_get_transformer.return_value = mocked_transformer

with self.assertRaises(EventEmissionExit):
self.processor([self.sample_event])

self.assertFalse(self.processor([self.sample_event]))
self.assertNotIn(call(transformed_event.to_json()), mocked_logger.mock_calls)

@override_settings(XAPI_EVENT_LOGGING_ENABLED=False)
Expand All @@ -116,6 +111,5 @@ def test_send_method_with_successfull_flow_no_logger(self, mocked_logger, mocked
def test_with_no_registry(self, mocked_logger):
backend = XApiProcessor()
backend.registry = None
with self.assertRaises(EventEmissionExit):
self.assertIsNone(backend([self.sample_event]))
self.assertFalse(backend([self.sample_event]))
mocked_logger.exception.assert_called_once()
Loading