Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add an index to event_search #2218

Merged
merged 4 commits into from
May 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,9 @@ def register_background_update_handler(self, update_name, update_handler):
self._background_update_handlers[update_name] = update_handler

def register_background_index_update(self, update_name, index_name,
table, columns, where_clause=None):
table, columns, where_clause=None,
unique=False,
psql_only=False):
"""Helper for store classes to do a background index addition

To use:
Expand All @@ -226,6 +228,9 @@ def register_background_index_update(self, update_name, index_name,
index_name (str): name of index to add
table (str): table to add index to
columns (list[str]): columns/expressions to include in index
unique (bool): true to make a UNIQUE index
psql_only: true to only create this index on psql databases (useful
for virtual sqlite tables)
"""

def create_index_psql(conn):
Expand All @@ -245,9 +250,11 @@ def create_index_psql(conn):
c.execute(sql)

sql = (
"CREATE INDEX CONCURRENTLY %(name)s ON %(table)s"
"CREATE %(unique)s INDEX CONCURRENTLY %(name)s"
" ON %(table)s"
" (%(columns)s) %(where_clause)s"
) % {
"unique": "UNIQUE" if unique else "",
"name": index_name,
"table": table,
"columns": ", ".join(columns),
Expand All @@ -270,9 +277,10 @@ def create_index_sqlite(conn):
# down at the wrong moment - hance we use IF NOT EXISTS. (SQLite
# has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.)
sql = (
"CREATE INDEX IF NOT EXISTS %(name)s ON %(table)s"
"CREATE %(unique)s INDEX IF NOT EXISTS %(name)s ON %(table)s"
" (%(columns)s)"
) % {
"unique": "UNIQUE" if unique else "",
"name": index_name,
"table": table,
"columns": ", ".join(columns),
Expand All @@ -284,13 +292,16 @@ def create_index_sqlite(conn):

if isinstance(self.database_engine, engines.PostgresEngine):
runner = create_index_psql
elif psql_only:
runner = None
else:
runner = create_index_sqlite

@defer.inlineCallbacks
def updater(progress, batch_size):
logger.info("Adding index %s to %s", index_name, table)
yield self.runWithConnection(runner)
if runner is not None:
logger.info("Adding index %s to %s", index_name, table)
yield self.runWithConnection(runner)
yield self._end_background_update(update_name)
defer.returnValue(1)

Expand Down
33 changes: 26 additions & 7 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,18 @@ def __init__(self, hs):
where_clause="contains_url = true AND outlier = false",
)

# an event_id index on event_search is useful for the purge_history
# api. Plus it means we get to enforce some integrity with a UNIQUE
# clause
self.register_background_index_update(
"event_search_event_id_idx",
index_name="event_search_event_id_idx",
table="event_search",
columns=["event_id"],
unique=True,
psql_only=True,
)

self._event_persist_queue = _EventPeristenceQueue()

def persist_events(self, events_and_contexts, backfilled=False):
Expand Down Expand Up @@ -2022,6 +2034,8 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
400, "topological_ordering is greater than forward extremeties"
)

logger.debug("[purge] looking for events to delete")

txn.execute(
"SELECT event_id, state_key FROM events"
" LEFT JOIN state_events USING (room_id, event_id)"
Expand All @@ -2030,6 +2044,14 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
)
event_rows = txn.fetchall()

to_delete = [
(event_id,) for event_id, state_key in event_rows
if state_key is None and not self.hs.is_mine_id(event_id)
]
logger.info(
"[purge] found %i events before cutoff, of which %i are remote"
" non-state events to delete", len(event_rows), len(to_delete))

for event_id, state_key in event_rows:
txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))

Expand Down Expand Up @@ -2080,6 +2102,7 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
)

state_rows = txn.fetchall()
logger.debug("[purge] found %i redundant state groups", len(state_rows))

# make a set of the redundant state groups, so that we can look them up
# efficiently
Expand Down Expand Up @@ -2173,10 +2196,6 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
)

# Delete all remote non-state events
to_delete = [
(event_id,) for event_id, state_key in event_rows
if state_key is None and not self.hs.is_mine_id(event_id)
]
for table in (
"events",
"event_json",
Expand All @@ -2192,15 +2211,15 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
"event_signatures",
"rejections",
):
logger.debug("[purge] removing non-state events from %s", table)
logger.debug("[purge] removing remote non-state events from %s", table)

txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
to_delete
)

# Mark all state and own events as outliers
logger.debug("[purge] marking events as outliers")
logger.debug("[purge] marking remaining events as outliers")
txn.executemany(
"UPDATE events SET outlier = ?"
" WHERE event_id = ?",
Expand All @@ -2210,7 +2229,7 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
]
)

logger.debug("[purge] done")
logger.info("[purge] done")

@defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2):
Expand Down
4 changes: 4 additions & 0 deletions synapse/storage/schema/delta/37/remove_auth_idx.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
-- and is used incredibly rarely.
DROP INDEX IF EXISTS events_order_topo_stream_room;

-- an equivalent index to this actually gets re-created in delta 41, because it
-- turned out that deleting it wasn't a great plan :/. In any case, let's
-- delete it here, and delta 41 will create a new one with an added UNIQUE
-- constraint
DROP INDEX IF EXISTS event_search_ev_idx;
"""

Expand Down
17 changes: 17 additions & 0 deletions synapse/storage/schema/delta/41/event_search_event_id_idx.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* Copyright 2017 Vector Creations Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

INSERT into background_updates (update_name, progress_json)
VALUES ('event_search_event_id_idx', '{}');