Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prefill state caches #2224

Merged
merged 7 commits into from
May 16, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ def __init__(self, txn, name, database_engine, after_callbacks):
object.__setattr__(self, "database_engine", database_engine)
object.__setattr__(self, "after_callbacks", after_callbacks)

def call_after(self, callback, *args):
def call_after(self, callback, *args, **kwargs):
"""Call the given callback on the main twisted thread after the
transaction has finished. Used to invalidate the caches on the
correct thread.
"""
self.after_callbacks.append((callback, args))
self.after_callbacks.append((callback, args, kwargs))

def __getattr__(self, name):
return getattr(self.txn, name)
Expand Down Expand Up @@ -319,8 +319,8 @@ def inner_func(conn, *args, **kwargs):
inner_func, *args, **kwargs
)
finally:
for after_callback, after_args in after_callbacks:
after_callback(*after_args)
for after_callback, after_args, after_kwargs in after_callbacks:
after_callback(*after_args, **after_kwargs)
defer.returnValue(result)

@defer.inlineCallbacks
Expand Down
28 changes: 23 additions & 5 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,24 @@ def _persist_events(self, events_and_contexts, backfilled=False,

event_counter.inc(event.type, origin_type, origin_entity)

for event, context in chunk:
if context.app_service:
origin_type = "local"
origin_entity = context.app_service.id
elif self.hs.is_mine_id(event.sender):
origin_type = "local"
origin_entity = "*client*"
else:
origin_type = "remote"
origin_entity = get_domain_from_id(event.sender)

event_counter.inc(event.type, origin_type, origin_entity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this looks a bit duplicated?


for room_id, (_, _, new_state) in current_state_for_room.iteritems():
self.get_current_state_ids.prefill(
(room_id, ), new_state
)

@defer.inlineCallbacks
def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
"""Calculates the new forward extremeties for a room given events to
Expand Down Expand Up @@ -435,10 +453,10 @@ def _calculate_state_delta(self, room_id, events_context, new_latest_event_ids):
Assumes that we are only persisting events for one room at a time.

Returns:
2-tuple (to_delete, to_insert) where both are state dicts, i.e.
(type, state_key) -> event_id. `to_delete` are the entries to
3-tuple (to_delete, to_insert, new_state) where both are state dicts,
i.e. (type, state_key) -> event_id. `to_delete` are the entries to
first be deleted from current_state_events, `to_insert` are entries
to insert.
to insert. `new_state` is the full set of state.
May return None if there are no changes to be applied.
"""
# Now we need to work out the different state sets for
Expand Down Expand Up @@ -545,7 +563,7 @@ def get_events(ev_ids):
if ev_id in events_to_insert
}

defer.returnValue((to_delete, to_insert))
defer.returnValue((to_delete, to_insert, current_state))

@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
Expand Down Expand Up @@ -698,7 +716,7 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled,

def _update_current_state_txn(self, txn, state_delta_by_room):
for room_id, current_state_tuple in state_delta_by_room.iteritems():
to_delete, to_insert = current_state_tuple
to_delete, to_insert, _ = current_state_tuple
txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?",
[(ev_id,) for ev_id in to_delete.itervalues()],
Expand Down
12 changes: 12 additions & 0 deletions synapse/storage/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,18 @@ def _store_mult_state_groups_txn(self, txn, events_and_contexts):
],
)

# Prefill the state group cache with this group.
# It's fine to use the sequence like this as the state group map
# is immutable. (If the map wasn't immutable then this prefill could
# race with another update)
txn.call_after(
self._state_group_cache.update,
self._state_group_cache.sequence,
key=context.state_group,
value=dict(context.current_state_ids),
full=True,
)

self._simple_insert_many_txn(
txn,
table="event_to_state_groups",
Expand Down