Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Split out SignatureStore and EventFederationStore #2925

Merged
merged 4 commits into from
Mar 1, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 8 additions & 42 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

from synapse.api.constants import EventTypes
from synapse.storage import DataStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.event_federation import EventFederationWorkerStore
from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.storage.stream import StreamStore
from synapse.storage.signatures import SignatureStore
from synapse.storage.signatures import SignatureWorkerStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
Expand All @@ -40,8 +40,12 @@
# the method descriptor on the DataStore and chuck them into our class.


class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
EventsWorkerStore, StateGroupWorkerStore,
class SlavedEventStore(EventFederationWorkerStore,
RoomMemberWorkerStore,
EventPushActionsWorkerStore,
EventsWorkerStore,
StateGroupWorkerStore,
SignatureWorkerStore,
BaseSlavedStore):

def __init__(self, db_conn, hs):
Expand Down Expand Up @@ -72,9 +76,6 @@ def __init__(self, db_conn, hs):

# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
get_latest_event_ids_in_room = EventFederationStore.__dict__[
"get_latest_event_ids_in_room"
]

get_recent_event_ids_for_room = (
StreamStore.__dict__["get_recent_event_ids_for_room"]
Expand All @@ -100,48 +101,13 @@ def __init__(self, db_conn, hs):

_get_events_around_txn = DataStore._get_events_around_txn.__func__

get_backfill_events = DataStore.get_backfill_events.__func__
_get_backfill_events = DataStore._get_backfill_events.__func__
get_missing_events = DataStore.get_missing_events.__func__
_get_missing_events = DataStore._get_missing_events.__func__

get_auth_chain = DataStore.get_auth_chain.__func__
get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
_get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__

get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__

get_forward_extremeties_for_room = (
DataStore.get_forward_extremeties_for_room.__func__
)
_get_forward_extremeties_for_room = (
EventFederationStore.__dict__["_get_forward_extremeties_for_room"]
)

get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__

get_federation_out_pos = DataStore.get_federation_out_pos.__func__
update_federation_out_pos = DataStore.update_federation_out_pos.__func__

get_latest_event_ids_and_hashes_in_room = (
DataStore.get_latest_event_ids_and_hashes_in_room.__func__
)
_get_latest_event_ids_and_hashes_in_room = (
DataStore._get_latest_event_ids_and_hashes_in_room.__func__
)
_get_event_reference_hashes_txn = (
DataStore._get_event_reference_hashes_txn.__func__
)
add_event_hashes = (
DataStore.add_event_hashes.__func__
)
get_event_reference_hashes = (
SignatureStore.__dict__["get_event_reference_hashes"]
)
get_event_reference_hash = (
SignatureStore.__dict__["get_event_reference_hash"]
)

def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
result["events"] = self._stream_id_gen.get_current_token()
Expand Down
264 changes: 135 additions & 129 deletions synapse/storage/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

from twisted.internet import defer

from ._base import SQLBaseStore
from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore
from synapse.storage.signatures import SignatureWorkerStore

from synapse.api.errors import StoreError
from synapse.util.caches.descriptors import cached
from unpaddedbase64 import encode_base64
Expand All @@ -27,30 +30,8 @@
logger = logging.getLogger(__name__)


class EventFederationStore(SQLBaseStore):
""" Responsible for storing and serving up the various graphs associated
with an event. Including the main event graph and the auth chains for an
event.

Also has methods for getting the front (latest) and back (oldest) edges
of the event graphs. These are used to generate the parents for new events
and backfilling from another server respectively.
"""

EVENT_AUTH_STATE_ONLY = "event_auth_state_only"

def __init__(self, db_conn, hs):
super(EventFederationStore, self).__init__(db_conn, hs)

self.register_background_update_handler(
self.EVENT_AUTH_STATE_ONLY,
self._background_delete_non_state_event_auth,
)

hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
)

class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
SQLBaseStore):
def get_auth_chain(self, event_ids, include_given=False):
"""Get auth events for given event_ids. The events *must* be state events.

Expand Down Expand Up @@ -228,88 +209,6 @@ def _get_min_depth_interaction(self, txn, room_id):

return int(min_depth) if min_depth is not None else None

def _update_min_depth_for_room_txn(self, txn, room_id, depth):
min_depth = self._get_min_depth_interaction(txn, room_id)

if min_depth and depth >= min_depth:
return

self._simple_upsert_txn(
txn,
table="room_depth",
keyvalues={
"room_id": room_id,
},
values={
"min_depth": depth,
},
)

def _handle_mult_prev_events(self, txn, events):
"""
For the given event, update the event edges table and forward and
backward extremities tables.
"""
self._simple_insert_many_txn(
txn,
table="event_edges",
values=[
{
"event_id": ev.event_id,
"prev_event_id": e_id,
"room_id": ev.room_id,
"is_state": False,
}
for ev in events
for e_id, _ in ev.prev_events
],
)

self._update_backward_extremeties(txn, events)

def _update_backward_extremeties(self, txn, events):
"""Updates the event_backward_extremities tables based on the new/updated
events being persisted.

This is called for new events *and* for events that were outliers, but
are now being persisted as non-outliers.

Forward extremities are handled when we first start persisting the events.
"""
events_by_room = {}
for ev in events:
events_by_room.setdefault(ev.room_id, []).append(ev)

query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
" AND NOT EXISTS ("
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" )"
)

txn.executemany(query, [
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
for ev in events for e_id, _ in ev.prev_events
if not ev.internal_metadata.is_outlier()
])

query = (
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
)
txn.executemany(
query,
[
(ev.event_id, ev.room_id) for ev in events
if not ev.internal_metadata.is_outlier()
]
)

def get_forward_extremeties_for_room(self, room_id, stream_ordering):
"""For a given room_id and stream_ordering, return the forward
extremeties of the room at that point in "time".
Expand Down Expand Up @@ -371,28 +270,6 @@ def get_forward_extremeties_for_room_txn(txn):
get_forward_extremeties_for_room_txn
)

def _delete_old_forward_extrem_cache(self):
def _delete_old_forward_extrem_cache_txn(txn):
# Delete entries older than a month, while making sure we don't delete
# the only entries for a room.
sql = ("""
DELETE FROM stream_ordering_to_exterm
WHERE
room_id IN (
SELECT room_id
FROM stream_ordering_to_exterm
WHERE stream_ordering > ?
) AND stream_ordering < ?
""")
txn.execute(
sql,
(self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
)
return self.runInteraction(
"_delete_old_forward_extrem_cache",
_delete_old_forward_extrem_cache_txn
)

def get_backfill_events(self, room_id, event_list, limit):
"""Get a list of Events for a given topic that occurred before (and
including) the events in event_list. Return a list of max size `limit`
Expand Down Expand Up @@ -522,6 +399,135 @@ def _get_missing_events(self, txn, room_id, earliest_events, latest_events,

return event_results


class EventFederationStore(EventFederationWorkerStore):
""" Responsible for storing and serving up the various graphs associated
with an event. Including the main event graph and the auth chains for an
event.

Also has methods for getting the front (latest) and back (oldest) edges
of the event graphs. These are used to generate the parents for new events
and backfilling from another server respectively.
"""

EVENT_AUTH_STATE_ONLY = "event_auth_state_only"

def __init__(self, db_conn, hs):
super(EventFederationStore, self).__init__(db_conn, hs)

self.register_background_update_handler(
self.EVENT_AUTH_STATE_ONLY,
self._background_delete_non_state_event_auth,
)

hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
)

def _update_min_depth_for_room_txn(self, txn, room_id, depth):
min_depth = self._get_min_depth_interaction(txn, room_id)

if min_depth and depth >= min_depth:
return

self._simple_upsert_txn(
txn,
table="room_depth",
keyvalues={
"room_id": room_id,
},
values={
"min_depth": depth,
},
)

def _handle_mult_prev_events(self, txn, events):
"""
For the given event, update the event edges table and forward and
backward extremities tables.
"""
self._simple_insert_many_txn(
txn,
table="event_edges",
values=[
{
"event_id": ev.event_id,
"prev_event_id": e_id,
"room_id": ev.room_id,
"is_state": False,
}
for ev in events
for e_id, _ in ev.prev_events
],
)

self._update_backward_extremeties(txn, events)

def _update_backward_extremeties(self, txn, events):
"""Updates the event_backward_extremities tables based on the new/updated
events being persisted.

This is called for new events *and* for events that were outliers, but
are now being persisted as non-outliers.

Forward extremities are handled when we first start persisting the events.
"""
events_by_room = {}
for ev in events:
events_by_room.setdefault(ev.room_id, []).append(ev)

query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
" AND NOT EXISTS ("
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" )"
)

txn.executemany(query, [
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
for ev in events for e_id, _ in ev.prev_events
if not ev.internal_metadata.is_outlier()
])

query = (
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
)
txn.executemany(
query,
[
(ev.event_id, ev.room_id) for ev in events
if not ev.internal_metadata.is_outlier()
]
)

def _delete_old_forward_extrem_cache(self):
def _delete_old_forward_extrem_cache_txn(txn):
# Delete entries older than a month, while making sure we don't delete
# the only entries for a room.
sql = ("""
DELETE FROM stream_ordering_to_exterm
WHERE
room_id IN (
SELECT room_id
FROM stream_ordering_to_exterm
WHERE stream_ordering > ?
) AND stream_ordering < ?
""")
txn.execute(
sql,
(self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
)
return self.runInteraction(
"_delete_old_forward_extrem_cache",
_delete_old_forward_extrem_cache_txn
)

def clean_room_for_join(self, room_id):
return self.runInteraction(
"clean_room_for_join",
Expand Down
Loading