Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use deltas to calculate current state deltas #3595

Merged
merged 7 commits into from
Jul 24, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 48 additions & 17 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,9 @@ def _persist_events(self, events_and_contexts, backfilled=False,
new_forward_extremeties = {}

# map room_id->(type,state_key)->event_id tracking the full
# state in each room after adding these events
# state in each room after adding these events.
# This is simply used to prefill the get_current_state_ids
# cache
current_state_for_room = {}

# map room_id->(to_delete, to_insert) where to_delete is a list
Expand Down Expand Up @@ -419,28 +421,40 @@ def _persist_events(self, events_and_contexts, backfilled=False,
logger.info(
"Calculating state delta for room %s", room_id,
)

with Measure(
self._clock,
"persist_events.get_new_state_after_events",
self._clock,
"persist_events.get_new_state_after_events",
):
current_state = yield self._get_new_state_after_events(
res = yield self._get_new_state_after_events(
room_id,
ev_ctx_rm,
latest_event_ids,
new_latest_event_ids,
)

if current_state is not None:
current_state_for_room[room_id] = current_state
current_state, delta_ids = res

# If either are not None then there has been a change,
# and we need to work out the delta (or use that
# given)
if delta_ids is not None:
# If there is a delta we know that we've
# only added or replaced state, never
# removed keys entirely.
state_delta_for_room[room_id] = ([], delta_ids)
elif current_state is not None:
with Measure(
self._clock,
"persist_events.calculate_state_delta",
self._clock,
"persist_events.calculate_state_delta",
):
delta = yield self._calculate_state_delta(
room_id, current_state,
)
state_delta_for_room[room_id] = delta
state_delta_for_room[room_id] = delta

# If we have the current_state then lets prefill
# the cache with it.
if current_state is not None:
current_state_for_room[room_id] = current_state

yield self.runInteraction(
"persist_events",
Expand Down Expand Up @@ -539,16 +553,20 @@ def _get_new_state_after_events(self, room_id, events_context, old_latest_event_
the new forward extremities for the room.

Returns:
Deferred[dict[(str,str), str]|None]:
None if there are no changes to the room state, or
a dict of (type, state_key) -> event_id].
Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]:
Returns a tuple of two state maps, the first being the full new current
state and the second being the delta to the existing current state.
If both are None then there has been no change.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we clarify when the second is None but the first is not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(done)

"""

if not new_latest_event_ids:
return

# map from state_group to ((type, key) -> event_id) state map
state_groups_map = {}

state_group_deltas = {}

for ev, ctx in events_context:
if ctx.state_group is None:
# I don't think this can happen, but let's double-check
Expand All @@ -567,6 +585,9 @@ def _get_new_state_after_events(self, room_id, events_context, old_latest_event_
if current_state_ids is not None:
state_groups_map[ctx.state_group] = current_state_ids

if ctx.prev_group:
state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids

# We need to map the event_ids to their state groups. First, let's
# check if the event is one we're persisting, in which case we can
# pull the state group from its context.
Expand Down Expand Up @@ -608,7 +629,7 @@ def _get_new_state_after_events(self, room_id, events_context, old_latest_event_
# If they old and new groups are the same then we don't need to do
# anything.
if old_state_groups == new_state_groups:
return
defer.returnValue((None, None))

# Now that we have calculated new_state_groups we need to get
# their state IDs so we can resolve to a single state set.
Expand All @@ -620,7 +641,17 @@ def _get_new_state_after_events(self, room_id, events_context, old_latest_event_
if len(new_state_groups) == 1:
# If there is only one state group, then we know what the current
# state is.
defer.returnValue(state_groups_map[new_state_groups.pop()])
new_state_group = new_state_groups.pop()

delta_ids = None
if len(old_state_groups) == 1:
old_state_group = old_state_groups.pop()

delta_ids = state_group_deltas.get(
(old_state_group, new_state_group,), None
)

defer.returnValue((state_groups_map[new_state_group], delta_ids))

# Ok, we need to defer to the state handler to resolve our state sets.

Expand All @@ -639,7 +670,7 @@ def get_events(ev_ids):
room_id, state_groups, events_map, get_events
)

defer.returnValue(res.state)
defer.returnValue((res.state, None))

@defer.inlineCallbacks
def _calculate_state_delta(self, room_id, current_state):
Expand Down