Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #15194: Prevent enqueuing duplicate events for an object #16366

Merged
merged 1 commit into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions netbox/extras/context_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ def event_tracking(request):
:param request: WSGIRequest object with a unique `id` set
"""
current_request.set(request)
events_queue.set([])
events_queue.set({})

yield

# Flush queued webhooks to RQ
flush_events(events_queue.get())
if events := list(events_queue.get().values()):
flush_events(events)

# Clear context vars
current_request.set(None)
events_queue.set([])
events_queue.set({})
32 changes: 19 additions & 13 deletions netbox/extras/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,21 @@ def enqueue_object(queue, instance, user, request_id, action):
if model_name not in registry['model_features']['event_rules'].get(app_label, []):
return

queue.append({
'content_type': ContentType.objects.get_for_model(instance),
'object_id': instance.pk,
'event': action,
'data': serialize_for_event(instance),
'snapshots': get_snapshots(instance, action),
'username': user.username,
'request_id': request_id
})
assert instance.pk is not None
key = f'{app_label}.{model_name}:{instance.pk}'
if key in queue:
queue[key]['data'] = serialize_for_event(instance)
queue[key]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
else:
queue[key] = {
'content_type': ContentType.objects.get_for_model(instance),
'object_id': instance.pk,
'event': action,
'data': serialize_for_event(instance),
'snapshots': get_snapshots(instance, action),
'username': user.username,
'request_id': request_id
}


def process_event_rules(event_rules, model_name, event, data, username=None, snapshots=None, request_id=None):
Expand Down Expand Up @@ -163,14 +169,14 @@ def process_event_queue(events):
)


def flush_events(queue):
def flush_events(events):
"""
Flush a list of object representation to RQ for webhook processing.
Flush a list of object representations to RQ for event processing.
"""
if queue:
if events:
for name in settings.EVENTS_PIPELINE:
try:
func = import_string(name)
func(queue)
func(events)
except Exception as e:
logger.error(_("Cannot import events pipeline {name} error: {error}").format(name=name, error=e))
29 changes: 8 additions & 21 deletions netbox/extras/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,6 @@ def run_validators(instance, validators):
clear_events = Signal()


def is_same_object(instance, webhook_data, request_id):
"""
Compare the given instance to the most recent queued webhook object, returning True
if they match. This check is used to avoid creating duplicate webhook entries.
"""
return (
ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and
instance.pk == webhook_data['object_id'] and
request_id == webhook_data['request_id']
)


@receiver((post_save, m2m_changed))
def handle_changed_object(sender, instance, **kwargs):
"""
Expand Down Expand Up @@ -112,14 +100,13 @@ def handle_changed_object(sender, instance, **kwargs):
objectchange.request_id = request.id
objectchange.save()

# If this is an M2M change, update the previously queued webhook (from post_save)
# Ensure that we're working with fresh M2M assignments
if m2m_changed:
instance.refresh_from_db()

# Enqueue the object for event processing
queue = events_queue.get()
if m2m_changed and queue and is_same_object(instance, queue[-1], request.id):
instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments
queue[-1]['data'] = serialize_for_event(instance)
queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
else:
enqueue_object(queue, instance, request.user, request.id, action)
enqueue_object(queue, instance, request.user, request.id, action)
events_queue.set(queue)

# Increment metric counters
Expand Down Expand Up @@ -179,7 +166,7 @@ def handle_deleted_object(sender, instance, **kwargs):
obj.snapshot() # Ensure the change record includes the "before" state
getattr(obj, related_field_name).remove(instance)

# Enqueue webhooks
# Enqueue the object for event processing
queue = events_queue.get()
enqueue_object(queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
events_queue.set(queue)
Expand All @@ -195,7 +182,7 @@ def clear_events_queue(sender, **kwargs):
"""
logger = logging.getLogger('events')
logger.info(f"Clearing {len(events_queue.get())} queued events ({sender})")
events_queue.set([])
events_queue.set({})


#
Expand Down
27 changes: 25 additions & 2 deletions netbox/extras/tests/test_event_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import django_rq
from django.http import HttpResponse
from django.test import RequestFactory
from django.urls import reverse
from requests import Session
from rest_framework import status
Expand All @@ -12,6 +13,7 @@
from dcim.choices import SiteStatusChoices
from dcim.models import Site
from extras.choices import EventRuleActionChoices, ObjectChangeActionChoices
from extras.context_managers import event_tracking
from extras.events import enqueue_object, flush_events, serialize_for_event
from extras.models import EventRule, Tag, Webhook
from extras.webhooks import generate_signature, send_webhook
Expand Down Expand Up @@ -360,7 +362,7 @@ def dummy_send(_, request, **kwargs):
return HttpResponse()

# Enqueue a webhook for processing
webhooks_queue = []
webhooks_queue = {}
site = Site.objects.create(name='Site 1', slug='site-1')
enqueue_object(
webhooks_queue,
Expand All @@ -369,11 +371,32 @@ def dummy_send(_, request, **kwargs):
request_id=request_id,
action=ObjectChangeActionChoices.ACTION_CREATE
)
flush_events(webhooks_queue)
flush_events(list(webhooks_queue.values()))

# Retrieve the job from queue
job = self.queue.jobs[0]

# Patch the Session object with our dummy_send() method, then process the webhook for sending
with patch.object(Session, 'send', dummy_send) as mock_send:
send_webhook(**job.kwargs)

def test_duplicate_triggers(self):
"""
Test for erroneous duplicate event triggers resulting from saving an object multiple times
within the span of a single request.
"""
url = reverse('dcim:site_add')
request = RequestFactory().get(url)
request.id = uuid.uuid4()
request.user = self.user

self.assertEqual(self.queue.count, 0, msg="Unexpected jobs found in queue")

with event_tracking(request):
site = Site(name='Site 1', slug='site-1')
site.save()

# Save the site a second time
site.save()

self.assertEqual(self.queue.count, 1, msg="Duplicate jobs found in queue")
2 changes: 1 addition & 1 deletion netbox/netbox/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@


current_request = ContextVar('current_request', default=None)
events_queue = ContextVar('events_queue', default=[])
events_queue = ContextVar('events_queue', default=dict())
Loading