Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prefill state caches #2224

Merged
merged 7 commits into from
May 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ def __init__(self, txn, name, database_engine, after_callbacks):
object.__setattr__(self, "database_engine", database_engine)
object.__setattr__(self, "after_callbacks", after_callbacks)

def call_after(self, callback, *args):
def call_after(self, callback, *args, **kwargs):
"""Call the given callback on the main twisted thread after the
transaction has finished. Used to invalidate the caches on the
correct thread.
"""
self.after_callbacks.append((callback, args))
self.after_callbacks.append((callback, args, kwargs))

def __getattr__(self, name):
return getattr(self.txn, name)
Expand Down Expand Up @@ -319,8 +319,8 @@ def inner_func(conn, *args, **kwargs):
inner_func, *args, **kwargs
)
finally:
for after_callback, after_args in after_callbacks:
after_callback(*after_args)
for after_callback, after_args, after_kwargs in after_callbacks:
after_callback(*after_args, **after_kwargs)
defer.returnValue(result)

@defer.inlineCallbacks
Expand Down
15 changes: 10 additions & 5 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,11 @@ def _persist_events(self, events_and_contexts, backfilled=False,

event_counter.inc(event.type, origin_type, origin_entity)

for room_id, (_, _, new_state) in current_state_for_room.iteritems():
self.get_current_state_ids.prefill(
(room_id, ), new_state
)

@defer.inlineCallbacks
def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
"""Calculates the new forward extremeties for a room given events to
Expand Down Expand Up @@ -435,10 +440,10 @@ def _calculate_state_delta(self, room_id, events_context, new_latest_event_ids):
Assumes that we are only persisting events for one room at a time.

Returns:
2-tuple (to_delete, to_insert) where both are state dicts, i.e.
(type, state_key) -> event_id. `to_delete` are the entries to
3-tuple (to_delete, to_insert, new_state) where both are state dicts,
i.e. (type, state_key) -> event_id. `to_delete` are the entries to
first be deleted from current_state_events, `to_insert` are entries
to insert.
to insert. `new_state` is the full set of state.
May return None if there are no changes to be applied.
"""
# Now we need to work out the different state sets for
Expand Down Expand Up @@ -545,7 +550,7 @@ def get_events(ev_ids):
if ev_id in events_to_insert
}

defer.returnValue((to_delete, to_insert))
defer.returnValue((to_delete, to_insert, current_state))

@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
Expand Down Expand Up @@ -698,7 +703,7 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled,

def _update_current_state_txn(self, txn, state_delta_by_room):
for room_id, current_state_tuple in state_delta_by_room.iteritems():
to_delete, to_insert = current_state_tuple
to_delete, to_insert, _ = current_state_tuple
txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?",
[(ev_id,) for ev_id in to_delete.itervalues()],
Expand Down
12 changes: 12 additions & 0 deletions synapse/storage/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,18 @@ def _store_mult_state_groups_txn(self, txn, events_and_contexts):
],
)

# Prefill the state group cache with this group.
# It's fine to use the sequence like this as the state group map
# is immutable. (If the map wasn't immutable then this prefill could
# race with another update)
txn.call_after(
self._state_group_cache.update,
self._state_group_cache.sequence,
key=context.state_group,
value=dict(context.current_state_ids),
full=True,
)

self._simple_insert_many_txn(
txn,
table="event_to_state_groups",
Expand Down