Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Store state groups separately from events #2784

Merged
merged 15 commits into from
Feb 6, 2018
Merged
24 changes: 17 additions & 7 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1831,8 +1831,8 @@ def do_auth(self, origin, event, context, auth_events):
current_state = set(e.event_id for e in auth_events.values())
different_auth = event_auth_events - current_state

self._update_context_for_auth_events(
context, auth_events, event_key,
yield self._update_context_for_auth_events(
event, context, auth_events, event_key,
)

if different_auth and not event.internal_metadata.is_outlier():
Expand Down Expand Up @@ -1913,8 +1913,8 @@ def do_auth(self, origin, event, context, auth_events):
# 4. Look at rejects and their proofs.
# TODO.

self._update_context_for_auth_events(
context, auth_events, event_key,
yield self._update_context_for_auth_events(
event, context, auth_events, event_key,
)

try:
Expand All @@ -1923,11 +1923,15 @@ def do_auth(self, origin, event, context, auth_events):
logger.warn("Failed auth resolution for %r because %s", event, e)
raise e

def _update_context_for_auth_events(self, context, auth_events,
@defer.inlineCallbacks
def _update_context_for_auth_events(self, event, context, auth_events,
event_key):
"""Update the state_ids in an event context after auth event resolution
"""Update the state_ids in an event context after auth event resolution,
storing the changes as a new state group.

Args:
event (Event): The event we're handling the context for

context (synapse.events.snapshot.EventContext): event context
to be updated

Expand All @@ -1950,7 +1954,13 @@ def _update_context_for_auth_events(self, context, auth_events,
context.prev_state_ids.update({
k: a.event_id for k, a in auth_events.iteritems()
})
context.state_group = self.store.get_next_state_group()
context.state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
prev_group=context.prev_group,
delta_ids=context.delta_ids,
current_state_ids=context.current_state_ids,
)

@defer.inlineCallbacks
def construct_auth_difference(self, local_auth, remote_auth):
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.event_push_actions import EventPushActionsStore
from synapse.storage.roommember import RoomMemberStore
from synapse.storage.state import StateGroupReadStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.storage.stream import StreamStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore
Expand All @@ -37,7 +37,7 @@
# the method descriptor on the DataStore and chuck them into our class.


class SlavedEventStore(StateGroupReadStore, BaseSlavedStore):
class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore):

def __init__(self, db_conn, hs):
super(SlavedEventStore, self).__init__(db_conn, hs)
Expand Down
44 changes: 36 additions & 8 deletions synapse/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,22 @@ def compute_event_context(self, event, old_state=None):
context.current_state_ids = {}
context.prev_state_ids = {}
context.prev_state_events = []
context.state_group = self.store.get_next_state_group()

context.state_group = yield self.store.store_state_group(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per #synapse-dev: we don't currently store a state group here, and it's not obvious to me that the state group we will store will be meaningful.

event.event_id,
event.room_id,
prev_group=None,
delta_ids=None,
current_state_ids=context.current_state_ids,
)

defer.returnValue(context)

if old_state:
context = EventContext()
context.prev_state_ids = {
(s.type, s.state_key): s.event_id for s in old_state
}
context.state_group = self.store.get_next_state_group()

if event.is_state():
key = (event.type, event.state_key)
Expand All @@ -229,6 +236,14 @@ def compute_event_context(self, event, old_state=None):
else:
context.current_state_ids = context.prev_state_ids

context.state_group = yield self.store.store_state_group(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to create a new state_group if it's not a state event?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. Basically, we've been explicitly told the state at this point (old_state) and we need to persist that. This e.g. happens when we're backfilling and we're told the state at this oldest event. We could try and be clever and check if we've already persisted a state group with identical state, but we're not that clever atm.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

event.event_id,
event.room_id,
prev_group=None,
delta_ids=None,
current_state_ids=context.current_state_ids,
)

context.prev_state_events = []
defer.returnValue(context)

Expand All @@ -242,8 +257,6 @@ def compute_event_context(self, event, old_state=None):
context = EventContext()
context.prev_state_ids = curr_state
if event.is_state():
context.state_group = self.store.get_next_state_group()

key = (event.type, event.state_key)
if key in context.prev_state_ids:
replaces = context.prev_state_ids[key]
Expand All @@ -261,15 +274,30 @@ def compute_event_context(self, event, old_state=None):
context.prev_group = entry.prev_group
context.delta_ids = dict(entry.delta_ids)
context.delta_ids[key] = event.event_id

context.state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
prev_group=entry.prev_group,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this entry.prev_group rather than context.prev_group as calculated above? (any idea when entry.state_group is falsy so we do the prev_group thing?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally the logic in compute_event_context is mysterious :/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this entry.prev_group rather than context.prev_group as calculated above?

That was a typo tbh.

(any idea when entry.state_group is falsy so we do the prev_group thing?)

entry.state_group is falsey if the resolved state doesn't already have a state group. This would typically happen when the prev events for the event point to two different state groups.

generally the logic in compute_event_context is mysterious :/

I've tried to add some comments

delta_ids=entry.delta_ids,
current_state_ids=context.current_state_ids,
)
else:
context.current_state_ids = context.prev_state_ids
context.prev_group = entry.prev_group
context.delta_ids = entry.delta_ids

if entry.state_group is None:
entry.state_group = self.store.get_next_state_group()
entry.state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
prev_group=entry.prev_group,
delta_ids=entry.delta_ids,
current_state_ids=context.current_state_ids,
)
entry.state_id = entry.state_group

context.state_group = entry.state_group
context.current_state_ids = context.prev_state_ids
context.prev_group = entry.prev_group
context.delta_ids = entry.delta_ids

context.prev_state_events = []
defer.returnValue(context)
Expand Down
1 change: 0 additions & 1 deletion synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ def __init__(self, db_conn, hs):
)

self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
Expand Down
6 changes: 6 additions & 0 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,9 @@ def is_connection_closed(self, conn):

def lock_table(self, txn, table):
txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,))

def get_next_state_group_id(self, txn):
"""Returns an int that can be used as a new state_group ID
"""
txn.execute("SELECT nextval('state_group_id_seq')")
return txn.fetchone()[0]
19 changes: 19 additions & 0 deletions synapse/storage/engines/sqlite3.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from synapse.storage.prepare_database import prepare_database

import struct
import threading


class Sqlite3Engine(object):
Expand All @@ -24,6 +25,11 @@ class Sqlite3Engine(object):
def __init__(self, database_module, database_config):
self.module = database_module

# The current max state_group, or None if we haven't looked
# in the DB yet.
self._current_state_group_id = None
self._current_state_group_id_lock = threading.Lock()

def check_database(self, txn):
pass

Expand All @@ -43,6 +49,19 @@ def is_connection_closed(self, conn):
def lock_table(self, txn, table):
return

def get_next_state_group_id(self, txn):
"""Returns an int that can be used as a new state_group ID
"""
# We do application locking here since if we're using sqlite then
# we are a single process synapse.
with self._current_state_group_id_lock:
if self._current_state_group_id is None:
txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups")
self._current_state_group_id = txn.fetchone()[0]

self._current_state_group_id += 1
return self._current_state_group_id


# Following functions taken from: https://github.com/coleifer/peewee

Expand Down
10 changes: 4 additions & 6 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,9 +742,8 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled,
events_and_contexts=events_and_contexts,
)

# Insert into the state_groups, state_groups_state, and
# event_to_state_groups tables.
self._store_mult_state_groups_txn(txn, events_and_contexts)
# Insert into event_to_state_groups tables.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/tables/table/. Or just s/tables//

self._store_event_state_mappings_txn(txn, events_and_contexts)

# _store_rejected_events_txn filters out any events which were
# rejected, and returns the filtered list.
Expand Down Expand Up @@ -979,10 +978,9 @@ def _update_outliers_txn(self, txn, events_and_contexts):
# an outlier in the database. We now have some state at that
# so we need to update the state_groups table with that state.

# insert into the state_group, state_groups_state and
# event_to_state_groups tables.
# insert into event_to_state_groups tables.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

try:
self._store_mult_state_groups_txn(txn, ((event, context),))
self._store_event_state_mappings_txn(txn, ((event, context),))
except Exception:
logger.exception("")
raise
Expand Down
24 changes: 24 additions & 0 deletions synapse/storage/schema/delta/47/state_group_seq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.storage.engines import PostgresEngine


def run_create(cur, database_engine, *args, **kwargs):
if isinstance(database_engine, PostgresEngine):
cur.execute("CREATE SEQUENCE state_group_id_seq")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you not need to initialise this? (see ab8567a which I did when playing with this)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh, yes. I've cherry-picked that.



def run_upgrade(*args, **kwargs):
pass
Loading