Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Store state groups separately from events #2784

Merged
merged 15 commits into from
Feb 6, 2018
Merged
24 changes: 17 additions & 7 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1831,8 +1831,8 @@ def do_auth(self, origin, event, context, auth_events):
current_state = set(e.event_id for e in auth_events.values())
different_auth = event_auth_events - current_state

self._update_context_for_auth_events(
context, auth_events, event_key,
yield self._update_context_for_auth_events(
event, context, auth_events, event_key,
)

if different_auth and not event.internal_metadata.is_outlier():
Expand Down Expand Up @@ -1913,8 +1913,8 @@ def do_auth(self, origin, event, context, auth_events):
# 4. Look at rejects and their proofs.
# TODO.

self._update_context_for_auth_events(
context, auth_events, event_key,
yield self._update_context_for_auth_events(
event, context, auth_events, event_key,
)

try:
Expand All @@ -1923,11 +1923,15 @@ def do_auth(self, origin, event, context, auth_events):
logger.warn("Failed auth resolution for %r because %s", event, e)
raise e

def _update_context_for_auth_events(self, context, auth_events,
@defer.inlineCallbacks
def _update_context_for_auth_events(self, event, context, auth_events,
event_key):
"""Update the state_ids in an event context after auth event resolution
"""Update the state_ids in an event context after auth event resolution,
storing the changes as a new state group.

Args:
event (Event): The event we're handling the context for

context (synapse.events.snapshot.EventContext): event context
to be updated

Expand All @@ -1950,7 +1954,13 @@ def _update_context_for_auth_events(self, context, auth_events,
context.prev_state_ids.update({
k: a.event_id for k, a in auth_events.iteritems()
})
context.state_group = self.store.get_next_state_group()
context.state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
prev_group=context.prev_group,
delta_ids=context.delta_ids,
current_state_ids=context.current_state_ids,
)

@defer.inlineCallbacks
def construct_auth_difference(self, local_auth, remote_auth):
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.event_push_actions import EventPushActionsStore
from synapse.storage.roommember import RoomMemberStore
from synapse.storage.state import StateGroupReadStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.storage.stream import StreamStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore
Expand All @@ -37,7 +37,7 @@
# the method descriptor on the DataStore and chuck them into our class.


class SlavedEventStore(StateGroupReadStore, BaseSlavedStore):
class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore):

def __init__(self, db_conn, hs):
super(SlavedEventStore, self).__init__(db_conn, hs)
Expand Down
56 changes: 49 additions & 7 deletions synapse/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,15 @@ def get_current_hosts_in_room(self, room_id, latest_event_ids=None):
def compute_event_context(self, event, old_state=None):
"""Build an EventContext structure for the event.

This works out what the current state should be for the event, and
generates a new state group if necessary.

Args:
event (synapse.events.EventBase):
old_state (dict|None): The state at the event if it can't be
calculated from existing events. This is normally only specified
when receiving an event from federation where we don't have the
prev events for, e.g. when backfilling.
Returns:
synapse.events.snapshot.EventContext:
"""
Expand All @@ -208,15 +215,22 @@ def compute_event_context(self, event, old_state=None):
context.current_state_ids = {}
context.prev_state_ids = {}
context.prev_state_events = []
context.state_group = self.store.get_next_state_group()

# We don't store state for outliers, so we don't generate a state
# froup for it.
context.state_group = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. the state_group field on EventContext is documented as 'state_group (int): state group id', so if it can now be None as well as an int that needs updating, please.


defer.returnValue(context)

if old_state:
# We already have the state, so we don't need to calculate it.
# Let's just correctly fill out the context and create a
# new state group for it.

context = EventContext()
context.prev_state_ids = {
(s.type, s.state_key): s.event_id for s in old_state
}
context.state_group = self.store.get_next_state_group()

if event.is_state():
key = (event.type, event.state_key)
Expand All @@ -229,6 +243,14 @@ def compute_event_context(self, event, old_state=None):
else:
context.current_state_ids = context.prev_state_ids

context.state_group = yield self.store.store_state_group(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to create a new state_group if it's not a state event?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. Basically, we've been explicitly told the state at this point (old_state) and we need to persist that. This e.g. happens when we're backfilling and we're told the state at this oldest event. We could try and be clever and check if we've already persisted a state group with identical state, but we're not that clever atm.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

event.event_id,
event.room_id,
prev_group=None,
delta_ids=None,
current_state_ids=context.current_state_ids,
)

context.prev_state_events = []
defer.returnValue(context)

Expand All @@ -242,7 +264,8 @@ def compute_event_context(self, event, old_state=None):
context = EventContext()
context.prev_state_ids = curr_state
if event.is_state():
context.state_group = self.store.get_next_state_group()
# If this is a state event then we need to create a new state
# group for the state after this event.

key = (event.type, event.state_key)
if key in context.prev_state_ids:
Expand All @@ -253,23 +276,42 @@ def compute_event_context(self, event, old_state=None):
context.current_state_ids[key] = event.event_id

if entry.state_group:
# If the state at the event has a state group assigned then
# we can use that as the prev group
context.prev_group = entry.state_group
context.delta_ids = {
key: event.event_id
}
elif entry.prev_group:
# If the state at the event only has a prev group, then we can
# use that as a prev group too.
context.prev_group = entry.prev_group
context.delta_ids = dict(entry.delta_ids)
context.delta_ids[key] = event.event_id

context.state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
prev_group=context.prev_group,
delta_ids=context.delta_ids,
current_state_ids=context.current_state_ids,
)
else:
context.current_state_ids = context.prev_state_ids
context.prev_group = entry.prev_group
context.delta_ids = entry.delta_ids

if entry.state_group is None:
entry.state_group = self.store.get_next_state_group()
entry.state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
prev_group=entry.prev_group,
delta_ids=entry.delta_ids,
current_state_ids=context.current_state_ids,
)
entry.state_id = entry.state_group

context.state_group = entry.state_group
context.current_state_ids = context.prev_state_ids
context.prev_group = entry.prev_group
context.delta_ids = entry.delta_ids

context.prev_state_events = []
defer.returnValue(context)
Expand Down
1 change: 0 additions & 1 deletion synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ def __init__(self, db_conn, hs):
)

self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
Expand Down
6 changes: 6 additions & 0 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,9 @@ def is_connection_closed(self, conn):

def lock_table(self, txn, table):
txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,))

def get_next_state_group_id(self, txn):
"""Returns an int that can be used as a new state_group ID
"""
txn.execute("SELECT nextval('state_group_id_seq')")
return txn.fetchone()[0]
19 changes: 19 additions & 0 deletions synapse/storage/engines/sqlite3.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from synapse.storage.prepare_database import prepare_database

import struct
import threading


class Sqlite3Engine(object):
Expand All @@ -24,6 +25,11 @@ class Sqlite3Engine(object):
def __init__(self, database_module, database_config):
self.module = database_module

# The current max state_group, or None if we haven't looked
# in the DB yet.
self._current_state_group_id = None
self._current_state_group_id_lock = threading.Lock()

def check_database(self, txn):
pass

Expand All @@ -43,6 +49,19 @@ def is_connection_closed(self, conn):
def lock_table(self, txn, table):
return

def get_next_state_group_id(self, txn):
"""Returns an int that can be used as a new state_group ID
"""
# We do application locking here since if we're using sqlite then
# we are a single process synapse.
with self._current_state_group_id_lock:
if self._current_state_group_id is None:
txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups")
self._current_state_group_id = txn.fetchone()[0]

self._current_state_group_id += 1
return self._current_state_group_id


# Following functions taken from: https://github.com/coleifer/peewee

Expand Down
10 changes: 4 additions & 6 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,9 +742,8 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled,
events_and_contexts=events_and_contexts,
)

# Insert into the state_groups, state_groups_state, and
# event_to_state_groups tables.
self._store_mult_state_groups_txn(txn, events_and_contexts)
# Insert into event_to_state_groups.
self._store_event_state_mappings_txn(txn, events_and_contexts)

# _store_rejected_events_txn filters out any events which were
# rejected, and returns the filtered list.
Expand Down Expand Up @@ -979,10 +978,9 @@ def _update_outliers_txn(self, txn, events_and_contexts):
# an outlier in the database. We now have some state at that
# so we need to update the state_groups table with that state.

# insert into the state_group, state_groups_state and
# event_to_state_groups tables.
# insert into event_to_state_groups.
try:
self._store_mult_state_groups_txn(txn, ((event, context),))
self._store_event_state_mappings_txn(txn, ((event, context),))
except Exception:
logger.exception("")
raise
Expand Down
37 changes: 37 additions & 0 deletions synapse/storage/schema/delta/47/state_group_seq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.storage.engines import PostgresEngine


def run_create(cur, database_engine, *args, **kwargs):
if isinstance(database_engine, PostgresEngine):
# if we already have some state groups, we want to start making new
# ones with a higher id.
cur.execute("SELECT max(id) FROM state_groups")
row = cur.fetchone()

if row[0] is None:
start_val = 1
else:
start_val = row[0] + 1

cur.execute(
"CREATE SEQUENCE state_group_id_seq START WITH %s",
(start_val, ),
)


def run_upgrade(*args, **kwargs):
pass
Loading