Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Backgroud update to clean out rooms from current state (#6802)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored Jan 30, 2020
1 parent b660327 commit 57ad702
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 2 deletions.
1 change: 1 addition & 0 deletions changelog.d/6802.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add background update to clean out left rooms from current state.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Add background update to go and delete current state events for rooms the
-- server is no longer in.
INSERT into background_updates (update_name, progress_json)
VALUES ('delete_old_current_state_events', '{}');
108 changes: 106 additions & 2 deletions synapse/storage/data_stores/main/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@

from twisted.internet import defer

from synapse.api.constants import EventTypes
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import NotFoundError
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.storage._base import SQLBaseStore
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore
from synapse.storage.database import Database
from synapse.storage.state import StateFilter
from synapse.util.caches import intern_string
Expand Down Expand Up @@ -300,14 +301,17 @@ def get_referenced_state_groups(self, state_groups):
return set(row["state_group"] for row in rows)


class MainStateBackgroundUpdateStore(SQLBaseStore):
class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):

CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events"

def __init__(self, database: Database, db_conn, hs):
super(MainStateBackgroundUpdateStore, self).__init__(database, db_conn, hs)

self.server_name = hs.hostname

self.db.updates.register_background_index_update(
self.CURRENT_STATE_INDEX_UPDATE_NAME,
index_name="current_state_events_member_index",
Expand All @@ -321,6 +325,106 @@ def __init__(self, database: Database, db_conn, hs):
table="event_to_state_groups",
columns=["state_group"],
)
self.db.updates.register_background_update_handler(
self.DELETE_CURRENT_STATE_UPDATE_NAME, self._background_remove_left_rooms,
)

async def _background_remove_left_rooms(self, progress, batch_size):
"""Background update to delete rows from `current_state_events` and
`event_forward_extremities` tables of rooms that the server is no
longer joined to.
"""

last_room_id = progress.get("last_room_id", "")

def _background_remove_left_rooms_txn(txn):
sql = """
SELECT DISTINCT room_id FROM current_state_events
WHERE room_id > ? ORDER BY room_id LIMIT ?
"""

txn.execute(sql, (last_room_id, batch_size))
room_ids = list(row[0] for row in txn)
if not room_ids:
return True, set()

sql = """
SELECT room_id
FROM current_state_events
WHERE
room_id > ? AND room_id <= ?
AND type = 'm.room.member'
AND membership = 'join'
AND state_key LIKE ?
GROUP BY room_id
"""

txn.execute(sql, (last_room_id, room_ids[-1], "%:" + self.server_name))

joined_room_ids = set(row[0] for row in txn)

left_rooms = set(room_ids) - joined_room_ids

# First we get all users that we still think were joined to the
# room. This is so that we can mark those device lists as
# potentially stale, since there may have been a period where the
# server didn't share a room with the remote user and therefore may
# have missed any device updates.
rows = self.db.simple_select_many_txn(
txn,
table="current_state_events",
column="room_id",
iterable=left_rooms,
keyvalues={"type": EventTypes.Member, "membership": Membership.JOIN},
retcols=("state_key",),
)

potentially_left_users = set(row["state_key"] for row in rows)

# Now lets actually delete the rooms from the DB.
self.db.simple_delete_many_txn(
txn,
table="current_state_events",
column="room_id",
iterable=left_rooms,
keyvalues={},
)

self.db.simple_delete_many_txn(
txn,
table="event_forward_extremities",
column="room_id",
iterable=left_rooms,
keyvalues={},
)

self.db.updates._background_update_progress_txn(
txn,
self.DELETE_CURRENT_STATE_UPDATE_NAME,
{"last_room_id": room_ids[-1]},
)

return False, potentially_left_users

finished, potentially_left_users = await self.db.runInteraction(
"_background_remove_left_rooms", _background_remove_left_rooms_txn
)

if finished:
await self.db.updates._end_background_update(
self.DELETE_CURRENT_STATE_UPDATE_NAME
)

# Now go and check if we still share a room with the remote users in
# the deleted rooms. If not mark their device lists as stale.
joined_users = await self.get_users_server_still_shares_room_with(
potentially_left_users
)

for user_id in potentially_left_users - joined_users:
await self.mark_remote_user_device_list_as_unsubscribed(user_id)

return batch_size


class StateStore(StateGroupWorkerStore, MainStateBackgroundUpdateStore):
Expand Down

0 comments on commit 57ad702

Please sign in to comment.