Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use execute_batch in more places #9188

Merged
merged 2 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9188.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up batch insertion when using PostgreSQL.
6 changes: 6 additions & 0 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@ def description(self) -> Any:
return self.txn.description

def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None:
"""Similar to `executemany`, except `txn.rowcount` will not be correct
afterwards.

More efficient than `executemany` on PostgreSQL
"""

if isinstance(self.database_engine, PostgresEngine):
from psycopg2.extras import execute_batch # type: ignore

Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ def _prune_txn(txn):
DELETE FROM device_lists_outbound_last_success
WHERE destination = ? AND user_id = ?
"""
txn.executemany(sql, ((row[0], row[1]) for row in rows))
txn.execute_batch(sql, ((row[0], row[1]) for row in rows))

logger.info("Pruned %d device list outbound pokes", count)

Expand Down Expand Up @@ -1343,7 +1343,7 @@ def _add_device_change_to_stream_txn(

# Delete older entries in the table, as we really only care about
# when the latest change happened.
txn.executemany(
txn.execute_batch(
"""
DELETE FROM device_lists_stream
WHERE user_id = ? AND device_id = ? AND stream_id < ?
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ def _add_push_actions_to_staging_txn(txn):
VALUES (?, ?, ?, ?, ?, ?)
"""

txn.executemany(
txn.execute_batch(
sql,
(
_gen_entry(user_id, actions)
Expand Down Expand Up @@ -803,7 +803,7 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
],
)

txn.executemany(
txn.execute_batch(
"""
UPDATE event_push_summary
SET notif_count = ?, unread_count = ?, stream_ordering = ?
Expand Down
12 changes: 2 additions & 10 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ async def _background_reindex_fields_sender(self, progress, batch_size):
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)

INSERT_CLUMP_SIZE = 1000

def reindex_txn(txn):
sql = (
"SELECT stream_ordering, event_id, json FROM events"
Expand Down Expand Up @@ -178,9 +176,7 @@ def reindex_txn(txn):

sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"

for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
clump = update_rows[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
Comment on lines -181 to -183
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change, this will essentially end up as txn.executemany(sql, update_rows) for SQLite. Is that OK or should we do this clumping inside of our execute_batch method for SQLite?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the clumping is a bit spurious, I believe executemany is fine handling large lists.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs don't say anything about trying to limit the length of the iterator used, so... 🤷

txn.execute_batch(sql, update_rows)

progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
Expand Down Expand Up @@ -210,8 +206,6 @@ async def _background_reindex_origin_server_ts(self, progress, batch_size):
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)

INSERT_CLUMP_SIZE = 1000

def reindex_search_txn(txn):
sql = (
"SELECT stream_ordering, event_id FROM events"
Expand Down Expand Up @@ -256,9 +250,7 @@ def reindex_search_txn(txn):

sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"

for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
txn.execute_batch(sql, rows_to_update)

progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
Expand Down
10 changes: 5 additions & 5 deletions synapse/storage/databases/main/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def update_cache_txn(txn):
" WHERE media_origin = ? AND media_id = ?"
)

txn.executemany(
txn.execute_batch(
sql,
(
(time_ms, media_origin, media_id)
Expand All @@ -430,7 +430,7 @@ def update_cache_txn(txn):
" WHERE media_id = ?"
)

txn.executemany(sql, ((time_ms, media_id) for media_id in local_media))
txn.execute_batch(sql, ((time_ms, media_id) for media_id in local_media))

return await self.db_pool.runInteraction(
"update_cached_last_access_time", update_cache_txn
Expand Down Expand Up @@ -557,7 +557,7 @@ async def delete_url_cache(self, media_ids):
sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"

def _delete_url_cache_txn(txn):
txn.executemany(sql, [(media_id,) for media_id in media_ids])
txn.execute_batch(sql, [(media_id,) for media_id in media_ids])

return await self.db_pool.runInteraction(
"delete_url_cache", _delete_url_cache_txn
Expand Down Expand Up @@ -586,11 +586,11 @@ async def delete_url_cache_media(self, media_ids):
def _delete_url_cache_media_txn(txn):
sql = "DELETE FROM local_media_repository WHERE media_id = ?"

txn.executemany(sql, [(media_id,) for media_id in media_ids])
txn.execute_batch(sql, [(media_id,) for media_id in media_ids])

sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"

txn.executemany(sql, [(media_id,) for media_id in media_ids])
txn.execute_batch(sql, [(media_id,) for media_id in media_ids])

return await self.db_pool.runInteraction(
"delete_url_cache_media", _delete_url_cache_media_txn
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def _purge_history_txn(self, txn, room_id, token, delete_local_events):
)

# Update backward extremeties
txn.executemany(
txn.execute_batch(
"INSERT INTO event_backward_extremities (room_id, event_id)"
" VALUES (?, ?)",
[(room_id, event_id) for event_id, in new_backwards_extrems],
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ def _bg_user_threepids_grandfather_txn(txn):
FROM user_threepids
"""

txn.executemany(sql, [(id_server,) for id_server in id_servers])
txn.execute_batch(sql, [(id_server,) for id_server in id_servers])

if id_servers:
await self.db_pool.runInteraction(
Expand Down
6 changes: 1 addition & 5 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,8 +873,6 @@ async def _background_add_membership_profile(self, progress, batch_size):
"max_stream_id_exclusive", self._stream_order_on_start + 1
)

INSERT_CLUMP_SIZE = 1000

def add_membership_profile_txn(txn):
sql = """
SELECT stream_ordering, event_id, events.room_id, event_json.json
Expand Down Expand Up @@ -915,9 +913,7 @@ def add_membership_profile_txn(txn):
UPDATE room_memberships SET display_name = ?, avatar_url = ?
WHERE event_id = ? AND room_id = ?
"""
for index in range(0, len(to_update), INSERT_CLUMP_SIZE):
clump = to_update[index : index + INSERT_CLUMP_SIZE]
txn.executemany(to_update_sql, clump)
txn.execute_batch(to_update_sql, to_update)

progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs
# { "ignored_users": "@someone:example.org": {} }
ignored_users = content.get("ignored_users", {})
if isinstance(ignored_users, dict) and ignored_users:
cur.executemany(insert_sql, [(user_id, u) for u in ignored_users])
cur.execute_batch(insert_sql, [(user_id, u) for u in ignored_users])

# Add indexes after inserting data for efficiency.
logger.info("Adding constraints to ignored_users table")
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def store_search_entries_txn(self, txn, entries):
for entry in entries
)

txn.executemany(sql, args)
txn.execute_batch(sql, args)

elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
Expand All @@ -75,7 +75,7 @@ def store_search_entries_txn(self, txn, entries):
for entry in entries
)

txn.executemany(sql, args)
txn.execute_batch(sql, args)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/state/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,11 +565,11 @@ def _purge_unreferenced_state_groups(self, txn, room_id, state_groups_to_delete)
)

logger.info("[purge] removing redundant state groups")
txn.executemany(
txn.execute_batch(
"DELETE FROM state_groups_state WHERE state_group = ?",
((sg,) for sg in state_groups_to_delete),
)
txn.executemany(
txn.execute_batch(
"DELETE FROM state_groups WHERE id = ?",
((sg,) for sg in state_groups_to_delete),
)
Expand Down