Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Use execute_batch in more places (#9188)
Browse files Browse the repository at this point in the history
* Use execute_batch in more places

* Newsfile
  • Loading branch information
erikjohnston authored Jan 21, 2021
1 parent c55e625 commit 7a43482
Show file tree
Hide file tree
Showing 12 changed files with 26 additions and 31 deletions.
1 change: 1 addition & 0 deletions changelog.d/9188.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up batch insertion when using PostgreSQL.
6 changes: 6 additions & 0 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@ def description(self) -> Any:
return self.txn.description

def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None:
"""Similar to `executemany`, except `txn.rowcount` will not be correct
afterwards.
More efficient than `executemany` on PostgreSQL
"""

if isinstance(self.database_engine, PostgresEngine):
from psycopg2.extras import execute_batch # type: ignore

Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ def _prune_txn(txn):
DELETE FROM device_lists_outbound_last_success
WHERE destination = ? AND user_id = ?
"""
txn.executemany(sql, ((row[0], row[1]) for row in rows))
txn.execute_batch(sql, ((row[0], row[1]) for row in rows))

logger.info("Pruned %d device list outbound pokes", count)

Expand Down Expand Up @@ -1343,7 +1343,7 @@ def _add_device_change_to_stream_txn(

# Delete older entries in the table, as we really only care about
# when the latest change happened.
txn.executemany(
txn.execute_batch(
"""
DELETE FROM device_lists_stream
WHERE user_id = ? AND device_id = ? AND stream_id < ?
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ def _add_push_actions_to_staging_txn(txn):
VALUES (?, ?, ?, ?, ?, ?)
"""

txn.executemany(
txn.execute_batch(
sql,
(
_gen_entry(user_id, actions)
Expand Down Expand Up @@ -803,7 +803,7 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
],
)

txn.executemany(
txn.execute_batch(
"""
UPDATE event_push_summary
SET notif_count = ?, unread_count = ?, stream_ordering = ?
Expand Down
12 changes: 2 additions & 10 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ async def _background_reindex_fields_sender(self, progress, batch_size):
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)

INSERT_CLUMP_SIZE = 1000

def reindex_txn(txn):
sql = (
"SELECT stream_ordering, event_id, json FROM events"
Expand Down Expand Up @@ -178,9 +176,7 @@ def reindex_txn(txn):

sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"

for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
clump = update_rows[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
txn.execute_batch(sql, update_rows)

progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
Expand Down Expand Up @@ -210,8 +206,6 @@ async def _background_reindex_origin_server_ts(self, progress, batch_size):
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)

INSERT_CLUMP_SIZE = 1000

def reindex_search_txn(txn):
sql = (
"SELECT stream_ordering, event_id FROM events"
Expand Down Expand Up @@ -256,9 +250,7 @@ def reindex_search_txn(txn):

sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"

for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
txn.execute_batch(sql, rows_to_update)

progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
Expand Down
10 changes: 5 additions & 5 deletions synapse/storage/databases/main/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def update_cache_txn(txn):
" WHERE media_origin = ? AND media_id = ?"
)

txn.executemany(
txn.execute_batch(
sql,
(
(time_ms, media_origin, media_id)
Expand All @@ -430,7 +430,7 @@ def update_cache_txn(txn):
" WHERE media_id = ?"
)

txn.executemany(sql, ((time_ms, media_id) for media_id in local_media))
txn.execute_batch(sql, ((time_ms, media_id) for media_id in local_media))

return await self.db_pool.runInteraction(
"update_cached_last_access_time", update_cache_txn
Expand Down Expand Up @@ -557,7 +557,7 @@ async def delete_url_cache(self, media_ids):
sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"

def _delete_url_cache_txn(txn):
txn.executemany(sql, [(media_id,) for media_id in media_ids])
txn.execute_batch(sql, [(media_id,) for media_id in media_ids])

return await self.db_pool.runInteraction(
"delete_url_cache", _delete_url_cache_txn
Expand Down Expand Up @@ -586,11 +586,11 @@ async def delete_url_cache_media(self, media_ids):
def _delete_url_cache_media_txn(txn):
sql = "DELETE FROM local_media_repository WHERE media_id = ?"

txn.executemany(sql, [(media_id,) for media_id in media_ids])
txn.execute_batch(sql, [(media_id,) for media_id in media_ids])

sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"

txn.executemany(sql, [(media_id,) for media_id in media_ids])
txn.execute_batch(sql, [(media_id,) for media_id in media_ids])

return await self.db_pool.runInteraction(
"delete_url_cache_media", _delete_url_cache_media_txn
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def _purge_history_txn(self, txn, room_id, token, delete_local_events):
)

# Update backward extremeties
txn.executemany(
txn.execute_batch(
"INSERT INTO event_backward_extremities (room_id, event_id)"
" VALUES (?, ?)",
[(room_id, event_id) for event_id, in new_backwards_extrems],
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ def _bg_user_threepids_grandfather_txn(txn):
FROM user_threepids
"""

txn.executemany(sql, [(id_server,) for id_server in id_servers])
txn.execute_batch(sql, [(id_server,) for id_server in id_servers])

if id_servers:
await self.db_pool.runInteraction(
Expand Down
6 changes: 1 addition & 5 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,8 +873,6 @@ async def _background_add_membership_profile(self, progress, batch_size):
"max_stream_id_exclusive", self._stream_order_on_start + 1
)

INSERT_CLUMP_SIZE = 1000

def add_membership_profile_txn(txn):
sql = """
SELECT stream_ordering, event_id, events.room_id, event_json.json
Expand Down Expand Up @@ -915,9 +913,7 @@ def add_membership_profile_txn(txn):
UPDATE room_memberships SET display_name = ?, avatar_url = ?
WHERE event_id = ? AND room_id = ?
"""
for index in range(0, len(to_update), INSERT_CLUMP_SIZE):
clump = to_update[index : index + INSERT_CLUMP_SIZE]
txn.executemany(to_update_sql, clump)
txn.execute_batch(to_update_sql, to_update)

progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs
# { "ignored_users": "@someone:example.org": {} }
ignored_users = content.get("ignored_users", {})
if isinstance(ignored_users, dict) and ignored_users:
cur.executemany(insert_sql, [(user_id, u) for u in ignored_users])
cur.execute_batch(insert_sql, [(user_id, u) for u in ignored_users])

# Add indexes after inserting data for efficiency.
logger.info("Adding constraints to ignored_users table")
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def store_search_entries_txn(self, txn, entries):
for entry in entries
)

txn.executemany(sql, args)
txn.execute_batch(sql, args)

elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
Expand All @@ -75,7 +75,7 @@ def store_search_entries_txn(self, txn, entries):
for entry in entries
)

txn.executemany(sql, args)
txn.execute_batch(sql, args)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/state/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,11 +565,11 @@ def _purge_unreferenced_state_groups(self, txn, room_id, state_groups_to_delete)
)

logger.info("[purge] removing redundant state groups")
txn.executemany(
txn.execute_batch(
"DELETE FROM state_groups_state WHERE state_group = ?",
((sg,) for sg in state_groups_to_delete),
)
txn.executemany(
txn.execute_batch(
"DELETE FROM state_groups WHERE id = ?",
((sg,) for sg in state_groups_to_delete),
)
Expand Down

0 comments on commit 7a43482

Please sign in to comment.