Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Handle half-created indices in receipts index background update (#14650)
Browse files Browse the repository at this point in the history
When Synapse is terminated while running the background update to create
the `receipts_graph` or `receipts_linearized` indexes, the indexes may
be successfully created (or marked as invalid on postgres) while the
background update remains unfinished. When Synapse next starts up, the
background update will fail because the index already exists, or exists
but is invalid on postgres.

Use the existing code to create indices in background updates, since it
handles these edge cases.

Signed-off-by: Sean Quah <[email protected]>
  • Loading branch information
squahtx authored Dec 9, 2022
1 parent 3ac412b commit 373c485
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 48 deletions.
2 changes: 2 additions & 0 deletions changelog.d/14650.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix a bug introduced in Synapse 1.72.0 where the background updates to add non-thread unique indexes on receipts would fail if they were previously interrupted.

55 changes: 46 additions & 9 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,48 @@ def register_background_index_update(
The named index will be dropped upon completion of the new index.
"""

async def updater(progress: JsonDict, batch_size: int) -> int:
await self.create_index_in_background(
index_name=index_name,
table=table,
columns=columns,
where_clause=where_clause,
unique=unique,
psql_only=psql_only,
replaces_index=replaces_index,
)
await self._end_background_update(update_name)
return 1

self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
updater, oneshot=True
)

async def create_index_in_background(
self,
index_name: str,
table: str,
columns: Iterable[str],
where_clause: Optional[str] = None,
unique: bool = False,
psql_only: bool = False,
replaces_index: Optional[str] = None,
) -> None:
"""Add an index in the background.
Args:
update_name: update_name to register for
index_name: name of index to add
table: table to add index to
columns: columns/expressions to include in index
where_clause: A WHERE clause to specify a partial unique index.
unique: true to make a UNIQUE index
psql_only: true to only create this index on psql databases (useful
for virtual sqlite tables)
replaces_index: The name of an index that this index replaces.
The named index will be dropped upon completion of the new index.
"""

def create_index_psql(conn: Connection) -> None:
conn.rollback()
# postgres insists on autocommit for the index
Expand Down Expand Up @@ -618,16 +660,11 @@ def create_index_sqlite(conn: Connection) -> None:
else:
runner = create_index_sqlite

async def updater(progress: JsonDict, batch_size: int) -> int:
if runner is not None:
logger.info("Adding index %s to %s", index_name, table)
await self.db_pool.runWithConnection(runner)
await self._end_background_update(update_name)
return 1
if runner is None:
return

self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
updater, oneshot=True
)
logger.info("Adding index %s to %s", index_name, table)
await self.db_pool.runWithConnection(runner)

async def _end_background_update(self, update_name: str) -> None:
"""Removes a completed background update task from the queue.
Expand Down
51 changes: 12 additions & 39 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,39 +924,6 @@ def _populate_receipt_event_stream_ordering_txn(

return batch_size

async def _create_receipts_index(self, index_name: str, table: str) -> None:
"""Adds a unique index on `(room_id, receipt_type, user_id)` to the given
receipts table, for non-thread receipts."""

def _create_index(conn: LoggingDatabaseConnection) -> None:
conn.rollback()

# we have to set autocommit, because postgres refuses to
# CREATE INDEX CONCURRENTLY without it.
if isinstance(self.database_engine, PostgresEngine):
conn.set_session(autocommit=True)

try:
c = conn.cursor()

# Now that the duplicates are gone, we can create the index.
concurrently = (
"CONCURRENTLY"
if isinstance(self.database_engine, PostgresEngine)
else ""
)
sql = f"""
CREATE UNIQUE INDEX {concurrently} {index_name}
ON {table}(room_id, receipt_type, user_id)
WHERE thread_id IS NULL
"""
c.execute(sql)
finally:
if isinstance(self.database_engine, PostgresEngine):
conn.set_session(autocommit=False)

await self.db_pool.runWithConnection(_create_index)

async def _background_receipts_linearized_unique_index(
self, progress: dict, batch_size: int
) -> int:
Expand Down Expand Up @@ -999,9 +966,12 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
_remote_duplicate_receipts_txn,
)

await self._create_receipts_index(
"receipts_linearized_unique_index",
"receipts_linearized",
await self.db_pool.updates.create_index_in_background(
index_name="receipts_linearized_unique_index",
table="receipts_linearized",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)

await self.db_pool.updates._end_background_update(
Expand Down Expand Up @@ -1050,9 +1020,12 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
_remote_duplicate_receipts_txn,
)

await self._create_receipts_index(
"receipts_graph_unique_index",
"receipts_graph",
await self.db_pool.updates.create_index_in_background(
index_name="receipts_graph_unique_index",
table="receipts_graph",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)

await self.db_pool.updates._end_background_update(
Expand Down

0 comments on commit 373c485

Please sign in to comment.