Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Split state groups into a separate data store #6296

Merged
merged 15 commits into from
Dec 20, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6295.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Split out state storage into separate data store.
6 changes: 4 additions & 2 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ def _purge_history(self, purge_id, room_id, token, delete_local_events):
self._purges_in_progress_by_room.add(room_id)
try:
with (yield self.pagination_lock.write(room_id)):
yield self.store.purge_history(room_id, token, delete_local_events)
yield self.storage.purge_events.purge_history(
room_id, token, delete_local_events
)
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception:
Expand Down Expand Up @@ -170,7 +172,7 @@ async def purge_room(self, room_id):
if joined:
raise SynapseError(400, "Users are still joined to this room")

await self.store.purge_room(room_id)
await self.storage.purge_events.purge_room(room_id)

@defer.inlineCallbacks
def get_messages(
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from synapse.storage.data_stores import DataStores
from synapse.storage.data_stores.main import DataStore
from synapse.storage.persist_events import EventsPersistenceStorage
from synapse.storage.purge_events import PurgeEventsStorage
from synapse.storage.state import StateGroupStorage

__all__ = ["DataStores", "DataStore"]
Expand All @@ -47,6 +48,7 @@ def __init__(self, hs, stores: DataStores):

self.persistence = EventsPersistenceStorage(hs, stores)
self.state = StateGroupStorage(hs, stores)
self.purge_events = PurgeEventsStorage(hs, stores)


def are_all_users_on_domain(txn, database_engine, domain):
Expand Down
3 changes: 3 additions & 0 deletions synapse/storage/data_stores/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.storage.data_stores.state import StateGroupDataStore


class DataStores(object):
"""The various data stores.
Expand All @@ -24,3 +26,4 @@ def __init__(self, main_store, db_conn, hs):
# Note we pass in the main store here as workers use a different main
# store.
self.main = main_store
self.state = StateGroupDataStore(db_conn, hs)
181 changes: 6 additions & 175 deletions synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1368,6 +1368,10 @@ def purge_history(self, room_id, token, delete_local_events):
if True, we will delete local events as well as remote ones
(instead of just marking them as outliers and deleting their
state groups).

Returns:
Deferred[set[int]]: The set of state groups that reference deleted
events.
"""

return self.runInteraction(
Expand Down Expand Up @@ -1521,60 +1525,6 @@ def _purge_history_txn(self, txn, room_id, token_str, delete_local_events):
"[purge] found %i referenced state groups", len(referenced_state_groups)
)

logger.info("[purge] finding state groups that can be deleted")

_ = self._find_unreferenced_groups_during_purge(txn, referenced_state_groups)
state_groups_to_delete, remaining_state_groups = _

logger.info(
"[purge] found %i state groups to delete", len(state_groups_to_delete)
)

logger.info(
"[purge] de-delta-ing %i remaining state groups",
len(remaining_state_groups),
)

# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
logger.info("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
curr_state = curr_state[sg]

self._simple_delete_txn(
txn, table="state_groups_state", keyvalues={"state_group": sg}
)

self._simple_delete_txn(
txn, table="state_group_edges", keyvalues={"state_group": sg}
)

self._simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": sg,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in iteritems(curr_state)
],
)

logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
((sg,) for sg in state_groups_to_delete),
)
txn.executemany(
"DELETE FROM state_groups WHERE id = ?",
((sg,) for sg in state_groups_to_delete),
)

logger.info("[purge] removing events from event_to_state_groups")
txn.execute(
"DELETE FROM event_to_state_groups "
Expand Down Expand Up @@ -1661,87 +1611,7 @@ def _purge_history_txn(self, txn, room_id, token_str, delete_local_events):

logger.info("[purge] done")

def _find_unreferenced_groups_during_purge(self, txn, state_groups):
"""Used when purging history to figure out which state groups can be
deleted and which need to be de-delta'ed (due to one of its prev groups
being scheduled for deletion).

Args:
txn
state_groups (set[int]): Set of state groups referenced by events
that are going to be deleted.

Returns:
tuple[set[int], set[int]]: The set of state groups that can be
deleted and the set of state groups that need to be de-delta'ed
"""
# Graph of state group -> previous group
graph = {}

# Set of events that we have found to be referenced by events
referenced_groups = set()

# Set of state groups we've already seen
state_groups_seen = set(state_groups)

# Set of state groups to handle next.
next_to_search = set(state_groups)
while next_to_search:
# We bound size of groups we're looking up at once, to stop the
# SQL query getting too big
if len(next_to_search) < 100:
current_search = next_to_search
next_to_search = set()
else:
current_search = set(itertools.islice(next_to_search, 100))
next_to_search -= current_search

# Check if state groups are referenced
sql = """
SELECT DISTINCT state_group FROM event_to_state_groups
LEFT JOIN events_to_purge AS ep USING (event_id)
WHERE ep.event_id IS NULL AND
"""
clause, args = make_in_list_sql_clause(
txn.database_engine, "state_group", current_search
)
txn.execute(sql + clause, list(args))

referenced = set(sg for sg, in txn)
referenced_groups |= referenced

# We don't continue iterating up the state group graphs for state
# groups that are referenced.
current_search -= referenced

rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
iterable=current_search,
keyvalues={},
retcols=("prev_state_group", "state_group"),
)

prevs = set(row["state_group"] for row in rows)
# We don't bother re-handling groups we've already seen
prevs -= state_groups_seen
next_to_search |= prevs
state_groups_seen |= prevs

for row in rows:
# Note: Each state group can have at most one prev group
graph[row["state_group"]] = row["prev_state_group"]

to_delete = state_groups_seen - referenced_groups

to_dedelta = set()
for sg in referenced_groups:
prev_sg = graph.get(sg)
if prev_sg and prev_sg in to_delete:
to_dedelta.add(sg)

return to_delete, to_dedelta
return referenced_state_groups

def purge_room(self, room_id):
"""Deletes all record of a room
Expand All @@ -1753,46 +1623,7 @@ def purge_room(self, room_id):
return self.runInteraction("purge_room", self._purge_room_txn, room_id)

def _purge_room_txn(self, txn, room_id):
# first we have to delete the state groups states
logger.info("[purge] removing %s from state_groups_state", room_id)

txn.execute(
"""
DELETE FROM state_groups_state WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)

# ... and the state group edges
logger.info("[purge] removing %s from state_group_edges", room_id)

txn.execute(
"""
DELETE FROM state_group_edges WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)

# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)

txn.execute(
"""
DELETE FROM state_groups WHERE id IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)

# and then tables which lack an index on room_id but have one on event_id
# First delete tables which lack an index on room_id but have one on event_id
for table in (
"event_auth",
"event_edges",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ DROP INDEX IF EXISTS events_room_id; -- Prefix of events_room_stream
DROP INDEX IF EXISTS events_order; -- Prefix of events_order_topo_stream_room
DROP INDEX IF EXISTS events_topological_ordering; -- Prefix of events_order_topo_stream_room
DROP INDEX IF EXISTS events_stream_ordering; -- Duplicate of PRIMARY KEY
DROP INDEX IF EXISTS state_groups_id; -- Duplicate of PRIMARY KEY
DROP INDEX IF EXISTS event_to_state_groups_id; -- Duplicate of PRIMARY KEY
DROP INDEX IF EXISTS event_push_actions_room_id_event_id_user_id_profile_tag; -- Duplicate of UNIQUE CONSTRAINT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -975,40 +975,6 @@ CREATE TABLE state_events (



CREATE TABLE state_group_edges (
state_group bigint NOT NULL,
prev_state_group bigint NOT NULL
);



CREATE SEQUENCE state_group_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;



CREATE TABLE state_groups (
id bigint NOT NULL,
room_id text NOT NULL,
event_id text NOT NULL
);



CREATE TABLE state_groups_state (
state_group bigint NOT NULL,
room_id text NOT NULL,
type text NOT NULL,
state_key text NOT NULL,
event_id text NOT NULL
);



CREATE TABLE stats_stream_pos (
lock character(1) DEFAULT 'X'::bpchar NOT NULL,
stream_id bigint,
Expand Down Expand Up @@ -1482,12 +1448,6 @@ ALTER TABLE ONLY state_events
ADD CONSTRAINT state_events_event_id_key UNIQUE (event_id);



ALTER TABLE ONLY state_groups
ADD CONSTRAINT state_groups_pkey PRIMARY KEY (id);



ALTER TABLE ONLY stats_stream_pos
ADD CONSTRAINT stats_stream_pos_lock_key UNIQUE (lock);

Expand Down Expand Up @@ -1928,18 +1888,6 @@ CREATE UNIQUE INDEX room_stats_room_ts ON room_stats USING btree (room_id, ts);



CREATE INDEX state_group_edges_idx ON state_group_edges USING btree (state_group);



CREATE INDEX state_group_edges_prev_idx ON state_group_edges USING btree (prev_state_group);



CREATE INDEX state_groups_state_type_idx ON state_groups_state USING btree (state_group, type, state_key);



CREATE INDEX stream_ordering_to_exterm_idx ON stream_ordering_to_exterm USING btree (stream_ordering);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ CREATE INDEX ev_edges_id ON event_edges(event_id);
CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id);
CREATE TABLE room_depth( room_id TEXT NOT NULL, min_depth INTEGER NOT NULL, UNIQUE (room_id) );
CREATE INDEX room_depth_room ON room_depth(room_id);
CREATE TABLE state_groups( id BIGINT PRIMARY KEY, room_id TEXT NOT NULL, event_id TEXT NOT NULL );
CREATE TABLE state_groups_state( state_group BIGINT NOT NULL, room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, event_id TEXT NOT NULL );
CREATE TABLE event_to_state_groups( event_id TEXT NOT NULL, state_group BIGINT NOT NULL, UNIQUE (event_id) );
CREATE TABLE local_media_repository ( media_id TEXT, media_type TEXT, media_length INTEGER, created_ts BIGINT, upload_name TEXT, user_id TEXT, quarantined_by TEXT, url_cache TEXT, last_access_ts BIGINT, UNIQUE (media_id) );
CREATE TABLE local_media_repository_thumbnails ( media_id TEXT, thumbnail_width INTEGER, thumbnail_height INTEGER, thumbnail_type TEXT, thumbnail_method TEXT, thumbnail_length INTEGER, UNIQUE ( media_id, thumbnail_width, thumbnail_height, thumbnail_type ) );
Expand Down Expand Up @@ -120,9 +118,6 @@ CREATE TABLE device_max_stream_id ( stream_id BIGINT NOT NULL );
CREATE TABLE public_room_list_stream ( stream_id BIGINT NOT NULL, room_id TEXT NOT NULL, visibility BOOLEAN NOT NULL , appservice_id TEXT, network_id TEXT);
CREATE INDEX public_room_list_stream_idx on public_room_list_stream( stream_id );
CREATE INDEX public_room_list_stream_rm_idx on public_room_list_stream( room_id, stream_id );
CREATE TABLE state_group_edges( state_group BIGINT NOT NULL, prev_state_group BIGINT NOT NULL );
CREATE INDEX state_group_edges_idx ON state_group_edges(state_group);
CREATE INDEX state_group_edges_prev_idx ON state_group_edges(prev_state_group);
CREATE TABLE stream_ordering_to_exterm ( stream_ordering BIGINT NOT NULL, room_id TEXT NOT NULL, event_id TEXT NOT NULL );
CREATE INDEX stream_ordering_to_exterm_idx on stream_ordering_to_exterm( stream_ordering );
CREATE INDEX stream_ordering_to_exterm_rm_idx on stream_ordering_to_exterm( room_id, stream_ordering );
Expand Down Expand Up @@ -254,6 +249,5 @@ CREATE INDEX user_ips_last_seen_only ON user_ips (last_seen);
CREATE INDEX users_creation_ts ON users (creation_ts);
CREATE INDEX event_to_state_groups_sg_index ON event_to_state_groups (state_group);
CREATE UNIQUE INDEX device_lists_remote_cache_unique_id ON device_lists_remote_cache (user_id, device_id);
CREATE INDEX state_groups_state_type_idx ON state_groups_state(state_group, type, state_key);
CREATE UNIQUE INDEX device_lists_remote_extremeties_unique_idx ON device_lists_remote_extremeties (user_id);
CREATE UNIQUE INDEX user_ips_user_token_ip_unique_index ON user_ips (user_id, access_token, ip);
Loading