Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3995 from matrix-org/rav/no_deextrem_outliers
Browse files Browse the repository at this point in the history
Fix bug in forward_extremity update logic
  • Loading branch information
richvdh authored Oct 4, 2018
2 parents d867943 + 9693625 commit c6dbd21
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 33 deletions.
1 change: 1 addition & 0 deletions changelog.d/3995.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug in event persistence logic which caused 'NoneType is not iterable'
102 changes: 69 additions & 33 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import batch_iter
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
Expand Down Expand Up @@ -386,12 +387,10 @@ def _persist_events(self, events_and_contexts, backfilled=False,
)

for room_id, ev_ctx_rm in iteritems(events_by_room):
# Work out new extremities by recursively adding and removing
# the new events.
latest_event_ids = yield self.get_latest_event_ids_in_room(
room_id
)
new_latest_event_ids = yield self._calculate_new_extremeties(
new_latest_event_ids = yield self._calculate_new_extremities(
room_id, ev_ctx_rm, latest_event_ids
)

Expand All @@ -400,6 +399,12 @@ def _persist_events(self, events_and_contexts, backfilled=False,
# No change in extremities, so no change in state
continue

# there should always be at least one forward extremity.
# (except during the initial persistence of the send_join
# results, in which case there will be no existing
# extremities, so we'll `continue` above and skip this bit.)
assert new_latest_event_ids, "No forward extremities left!"

new_forward_extremeties[room_id] = new_latest_event_ids

len_1 = (
Expand Down Expand Up @@ -517,44 +522,79 @@ def _persist_events(self, events_and_contexts, backfilled=False,
)

@defer.inlineCallbacks
def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
"""Calculates the new forward extremeties for a room given events to
def _calculate_new_extremities(self, room_id, event_contexts, latest_event_ids):
"""Calculates the new forward extremities for a room given events to
persist.
Assumes that we are only persisting events for one room at a time.
"""
new_latest_event_ids = set(latest_event_ids)
# First, add all the new events to the list
new_latest_event_ids.update(
event.event_id for event, ctx in event_contexts

# we're only interested in new events which aren't outliers and which aren't
# being rejected.
new_events = [
event for event, ctx in event_contexts
if not event.internal_metadata.is_outlier() and not ctx.rejected
]

# start with the existing forward extremities
result = set(latest_event_ids)

# add all the new events to the list
result.update(
event.event_id for event in new_events
)
# Now remove all events that are referenced by the to-be-added events
new_latest_event_ids.difference_update(

# Now remove all events which are prev_events of any of the new events
result.difference_update(
e_id
for event, ctx in event_contexts
for event in new_events
for e_id, _ in event.prev_events
if not event.internal_metadata.is_outlier() and not ctx.rejected
)

# And finally remove any events that are referenced by previously added
# events.
rows = yield self._simple_select_many_batch(
table="event_edges",
column="prev_event_id",
iterable=list(new_latest_event_ids),
retcols=["prev_event_id"],
keyvalues={
"is_state": False,
},
desc="_calculate_new_extremeties",
)
# Finally, remove any events which are prev_events of any existing events.
existing_prevs = yield self._get_events_which_are_prevs(result)
result.difference_update(existing_prevs)

new_latest_event_ids.difference_update(
row["prev_event_id"] for row in rows
)
defer.returnValue(result)

defer.returnValue(new_latest_event_ids)
@defer.inlineCallbacks
def _get_events_which_are_prevs(self, event_ids):
"""Filter the supplied list of event_ids to get those which are prev_events of
existing (non-outlier/rejected) events.
Args:
event_ids (Iterable[str]): event ids to filter
Returns:
Deferred[List[str]]: filtered event ids
"""
results = []

def _get_events(txn, batch):
sql = """
SELECT prev_event_id
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN rejections USING (event_id)
WHERE
prev_event_id IN (%s)
AND NOT events.outlier
AND rejections.event_id IS NULL
""" % (
",".join("?" for _ in batch),
)

txn.execute(sql, batch)
results.extend(r[0] for r in txn)

for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
"_get_events_which_are_prevs",
_get_events,
chunk,
)

defer.returnValue(results)

@defer.inlineCallbacks
def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids,
Expand Down Expand Up @@ -586,10 +626,6 @@ def _get_new_state_after_events(self, room_id, events_context, old_latest_event_
the new current state is only returned if we've already calculated
it.
"""

if not new_latest_event_ids:
return

# map from state_group to ((type, key) -> event_id) state map
state_groups_map = {}

Expand Down

0 comments on commit c6dbd21

Please sign in to comment.