From 9cd7610f86ab5051c9365dd38d1eec405a5f8ca6 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 7 Feb 2023 15:26:55 +0000 Subject: [PATCH] Revert "Add `event_stream_ordering` column to membership state tables (#14979)" This reverts commit 5fdc12f482c68e2cdbb78d7db5de2cfe621720d4. --- synapse/storage/databases/main/events.py | 23 +--- .../databases/main/events_bg_updates.py | 104 +----------------- .../storage/databases/main/events_worker.py | 8 +- ...embership_tables_event_stream_ordering.sql | 21 ---- 4 files changed, 11 insertions(+), 145 deletions(-) delete mode 100644 synapse/storage/schema/main/delta/73/26membership_tables_event_stream_ordering.sql diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index b6cce0a7ccf1..1536937b67e8 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1147,15 +1147,11 @@ def _update_current_state_txn( # been inserted into room_memberships. txn.execute_batch( """INSERT INTO current_state_events - (room_id, type, state_key, event_id, membership, event_stream_ordering) - VALUES ( - ?, ?, ?, ?, - (SELECT membership FROM room_memberships WHERE event_id = ?), - (SELECT stream_ordering FROM events WHERE event_id = ?) - ) + (room_id, type, state_key, event_id, membership) + VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?)) """, [ - (room_id, key[0], key[1], ev_id, ev_id, ev_id) + (room_id, key[0], key[1], ev_id, ev_id) for key, ev_id in to_insert.items() ], ) @@ -1182,15 +1178,11 @@ def _update_current_state_txn( if to_insert: txn.execute_batch( """INSERT INTO local_current_membership - (room_id, user_id, event_id, membership, event_stream_ordering) - VALUES ( - ?, ?, ?, - (SELECT membership FROM room_memberships WHERE event_id = ?), - (SELECT stream_ordering FROM events WHERE event_id = ?) - ) + (room_id, user_id, event_id, membership) + VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?)) """, [ - (room_id, key[1], ev_id, ev_id, ev_id) + (room_id, key[1], ev_id, ev_id) for key, ev_id in to_insert.items() if key[0] == EventTypes.Member and self.is_mine_id(key[1]) ], @@ -1798,7 +1790,6 @@ def _store_room_members_txn( table="room_memberships", keys=( "event_id", - "event_stream_ordering", "user_id", "sender", "room_id", @@ -1809,7 +1800,6 @@ def _store_room_members_txn( values=[ ( event.event_id, - event.internal_metadata.stream_ordering, event.state_key, event.user_id, event.room_id, @@ -1842,7 +1832,6 @@ def _store_room_members_txn( keyvalues={"room_id": event.room_id, "user_id": event.state_key}, values={ "event_id": event.event_id, - "event_stream_ordering": event.internal_metadata.stream_ordering, "membership": event.membership, }, ) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 0e81d38ccaee..b9d3c36d602c 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -17,7 +17,7 @@ import attr -from synapse.api.constants import EventContentFields, EventTypes, RelationTypes +from synapse.api.constants import EventContentFields, RelationTypes from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import make_event_from_dict from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -71,10 +71,6 @@ class _BackgroundUpdates: EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index" - POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING = ( - "populate_membership_event_stream_ordering" - ) - @attr.s(slots=True, frozen=True, auto_attribs=True) class _CalculateChainCover: @@ -103,10 +99,6 @@ def __init__( ): super().__init__(database, db_conn, hs) - self.db_pool.updates.register_background_update_handler( - _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING, - self._populate_membership_event_stream_ordering, - ) self.db_pool.updates.register_background_update_handler( _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts, @@ -1506,97 +1498,3 @@ def _populate_txn(txn: LoggingTransaction) -> bool: ) return batch_size - - async def _populate_membership_event_stream_ordering( - self, progress: JsonDict, batch_size: int - ) -> int: - def _populate_membership_event_stream_ordering( - txn: LoggingTransaction, - ) -> bool: - - if "max_stream_ordering" in progress: - max_stream_ordering = progress["max_stream_ordering"] - else: - txn.execute("SELECT max(stream_ordering) FROM events") - res = txn.fetchone() - if res is None or res[0] is None: - return True - else: - max_stream_ordering = res[0] - - start = progress.get("stream_ordering", 0) - stop = start + batch_size - - sql = f""" - SELECT room_id, event_id, stream_ordering - FROM events - WHERE - type = '{EventTypes.Member}' - AND stream_ordering >= ? - AND stream_ordering < ? - """ - txn.execute(sql, (start, stop)) - - rows: List[Tuple[str, str, int]] = cast( - List[Tuple[str, str, int]], txn.fetchall() - ) - - event_ids: List[Tuple[str]] = [] - event_stream_orderings: List[Tuple[int]] = [] - - for _, event_id, event_stream_ordering in rows: - event_ids.append((event_id,)) - event_stream_orderings.append((event_stream_ordering,)) - - self.db_pool.simple_update_many_txn( - txn, - table="current_state_events", - key_names=("event_id",), - key_values=event_ids, - value_names=("event_stream_ordering",), - value_values=event_stream_orderings, - ) - - self.db_pool.simple_update_many_txn( - txn, - table="room_memberships", - key_names=("event_id",), - key_values=event_ids, - value_names=("event_stream_ordering",), - value_values=event_stream_orderings, - ) - - # NOTE: local_current_membership has no index on event_id, so only - # the room ID here will reduce the query rows read. - for room_id, event_id, event_stream_ordering in rows: - txn.execute( - """ - UPDATE local_current_membership - SET event_stream_ordering = ? - WHERE room_id = ? AND event_id = ? - """, - (event_stream_ordering, room_id, event_id), - ) - - self.db_pool.updates._background_update_progress_txn( - txn, - _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING, - { - "stream_ordering": stop, - "max_stream_ordering": max_stream_ordering, - }, - ) - - return stop > max_stream_ordering - - finished = await self.db_pool.runInteraction( - "_populate_membership_event_stream_ordering", - _populate_membership_event_stream_ordering, - ) - - if finished: - await self.db_pool.updates._end_background_update( - _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING - ) - - return batch_size diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 6d0ef10258de..d7d08369cacd 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1779,7 +1779,7 @@ def get_ex_outlier_stream_rows_txn( txn: LoggingTransaction, ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]: sql = ( - "SELECT out.event_stream_ordering, e.event_id, e.room_id, e.type," + "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL," " e.outlier" " FROM events AS e" @@ -1791,10 +1791,10 @@ def get_ex_outlier_stream_rows_txn( " LEFT JOIN event_relations USING (event_id)" " LEFT JOIN room_memberships USING (event_id)" " LEFT JOIN rejections USING (event_id)" - " WHERE ? < out.event_stream_ordering" - " AND out.event_stream_ordering <= ?" + " WHERE ? < event_stream_ordering" + " AND event_stream_ordering <= ?" " AND out.instance_name = ?" - " ORDER BY out.event_stream_ordering ASC" + " ORDER BY event_stream_ordering ASC" ) txn.execute(sql, (last_id, current_id, instance_name)) diff --git a/synapse/storage/schema/main/delta/73/26membership_tables_event_stream_ordering.sql b/synapse/storage/schema/main/delta/73/26membership_tables_event_stream_ordering.sql deleted file mode 100644 index 7c30a67fc4d1..000000000000 --- a/synapse/storage/schema/main/delta/73/26membership_tables_event_stream_ordering.sql +++ /dev/null @@ -1,21 +0,0 @@ -/* Copyright 2022 Beeper - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT; -ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT; -ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT; - -INSERT INTO background_updates (update_name, progress_json) VALUES - ('populate_membership_event_stream_ordering', '{}');