Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Split purge API into events vs state and add PurgeEventsStorage #6295

Merged
merged 14 commits into from
Nov 8, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(self, hs):
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.state_store = self.storage.state
self.clock = hs.get_clock()
self._server_name = hs.hostname

Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from synapse.storage.data_stores.main import DataStore
from synapse.storage.persist_events import EventsPersistenceStorage
from synapse.storage.purge_events import PurgeEventsStorage
from synapse.storage.state import StateGroupStorage

__all__ = ["DataStores", "DataStore"]

Expand All @@ -47,6 +48,7 @@ def __init__(self, hs, stores: DataStores):

self.persistence = EventsPersistenceStorage(hs, stores)
self.purge_events = PurgeEventsStorage(hs, stores)
self.state = StateGroupStorage(hs, stores)


def are_all_users_on_domain(txn, database_engine, domain):
Expand Down
29 changes: 20 additions & 9 deletions synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
from collections import Counter as c_counter, OrderedDict, namedtuple
from functools import wraps
from typing import Set

from six import iteritems, text_type
from six.moves import range
Expand Down Expand Up @@ -1370,8 +1371,8 @@ def purge_history(self, room_id, token, delete_local_events):
state groups).

Returns:
Deferred[set[int]]: The set of state groups that reference deleted
events.
Deferred[set[int]]: The set of state groups that are referenced by
deleted events.
"""

return self.runInteraction(
Expand Down Expand Up @@ -1508,7 +1509,7 @@ def _purge_history_txn(self, txn, room_id, token_str, delete_local_events):
[(room_id, event_id) for event_id, in new_backwards_extrems],
)

logger.info("[purge] finding redundant state groups")
logger.info("[purge] finding state groups referenced by deleted events")

# Get all state groups that are referenced by events that are to be
# deleted. We then go and check if they are referenced by other events
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -1711,9 +1712,16 @@ def _purge_room_txn(self, txn, room_id):

logger.info("[purge] done")

def purge_unreferenced_state_groups(self, room_id, state_groups_to_delete):
def purge_unreferenced_state_groups(
self, room_id: str, state_groups_to_delete: Set[int]
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
) -> defer.Deferred:
"""Deletes no longer referenced state groups and de-deltas any state
groups that reference them.

Args:
room_id: The room the state groups belong to (must all be in the
same room).
state_groups_to_delete: Set of all state groups to delete.
richvdh marked this conversation as resolved.
Show resolved Hide resolved
"""

return self.runInteraction(
Expand Down Expand Up @@ -1829,8 +1837,10 @@ def _purge_room_state_txn(self, txn, room_id):
txn.execute(
"""
DELETE FROM state_groups_state
INNER JOIN state_groups USING (event_id)
WHEREE state_groups.room_id = ?
WHERE state_group IN (
SELECT state_group FROM state_groups
WHERE room_id = ?
)
""",
(room_id,),
)
Expand All @@ -1841,8 +1851,9 @@ def _purge_room_state_txn(self, txn, room_id):
txn.execute(
"""
DELETE FROM state_group_edges
INNER JOIN state_groups USING (event_id)
WHEREE state_groups.room_id = ?
WHERE state_group IN (
SELECT state_group FROM state_groups
WHERE room_id = ?
)
""",
(room_id,),
Expand All @@ -1853,7 +1864,7 @@ def _purge_room_state_txn(self, txn, room_id):

txn.execute(
"""
DELETE FROM state_groups WHEREE room_id = ?
DELETE FROM state_groups WHERE room_id = ?
""",
(room_id,),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

INSERT INTO background_updates (update_name, progress_json) VALUES
('state_groups_room_id_idx', '{}');
7 changes: 7 additions & 0 deletions synapse/storage/data_stores/main/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,7 @@ class StateBackgroundUpdateStore(
STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
STATE_GROUPS_ROOM_INDEX_UPDATE_NAME = "state_groups_room_id_idx"

def __init__(self, db_conn, hs):
super(StateBackgroundUpdateStore, self).__init__(db_conn, hs)
Expand All @@ -1044,6 +1045,12 @@ def __init__(self, db_conn, hs):
table="event_to_state_groups",
columns=["state_group"],
)
self.register_background_index_update(
richvdh marked this conversation as resolved.
Show resolved Hide resolved
self.STATE_GROUPS_ROOM_INDEX_UPDATE_NAME,
index_name="state_groups_room_id_idx",
table="state_groups",
columns=["room_id"],
)

@defer.inlineCallbacks
def _background_deduplicate_state(self, progress, batch_size):
Expand Down