Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #975 from matrix-org/erikj/multi_event_persist
Browse files Browse the repository at this point in the history
Ensure we only persist an event once at a time
  • Loading branch information
erikjohnston authored Aug 3, 2016
2 parents 97f072d + 80ad710 commit f5da3ba
Showing 1 changed file with 18 additions and 20 deletions.
38 changes: 18 additions & 20 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from synapse.api.errors import SynapseError

from canonicaljson import encode_canonical_json
from collections import deque, namedtuple
from collections import deque, namedtuple, OrderedDict

import synapse
import synapse.metrics
Expand Down Expand Up @@ -403,6 +403,23 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled):
and the rejections table. Things reading from those table will need to check
whether the event was rejected.
"""
# Ensure that we don't have the same event twice.
# Pick the earliest non-outlier if there is one, else the earliest one.
new_events_and_contexts = OrderedDict()
for event, context in events_and_contexts:
prev_event_context = new_events_and_contexts.get(event.event_id)
if prev_event_context:
if not event.internal_metadata.is_outlier():
if prev_event_context[0].internal_metadata.is_outlier():
# To ensure correct ordering we pop, as OrderedDict is
# ordered by first insertion.
new_events_and_contexts.pop(event.event_id, None)
new_events_and_contexts[event.event_id] = (event, context)
else:
new_events_and_contexts[event.event_id] = (event, context)

events_and_contexts = new_events_and_contexts.values()

depth_updates = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
Expand Down Expand Up @@ -433,8 +450,6 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled):
for event_id, outlier in txn.fetchall()
}

# Remove the events that we've seen before.
event_map = {}
to_remove = set()
for event, context in events_and_contexts:
if context.rejected:
Expand All @@ -445,23 +460,6 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled):
to_remove.add(event)
continue

# Handle the case of the list including the same event multiple
# times. The tricky thing here is when they differ by whether
# they are an outlier.
if event.event_id in event_map:
other = event_map[event.event_id]

if not other.internal_metadata.is_outlier():
to_remove.add(event)
continue
elif not event.internal_metadata.is_outlier():
to_remove.add(event)
continue
else:
to_remove.add(other)

event_map[event.event_id] = event

if event.event_id not in have_persisted:
continue

Expand Down

0 comments on commit f5da3ba

Please sign in to comment.