Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Speed up rebuilding of the user directory for local users #15529

Merged
merged 7 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,21 +386,30 @@ def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None:
self.executemany(sql, args)

def execute_values(
self, sql: str, values: Iterable[Iterable[Any]], fetch: bool = True
self,
sql: str,
values: Iterable[Iterable[Any]],
template: Optional[str] = None,
fetch: bool = True,
) -> List[Tuple]:
"""Corresponds to psycopg2.extras.execute_values. Only available when
using postgres.

The `fetch` parameter must be set to False if the query does not return
rows (e.g. INSERTs).

The `template` is the snippet to merge to every item in argslist to
compose the query.
"""
assert isinstance(self.database_engine, PostgresEngine)
from psycopg2.extras import execute_values

return self._do_execute(
# TODO: is it safe for values to be Iterable[Iterable[Any]] here?
# https://www.psycopg.org/docs/extras.html?highlight=execute_batch#psycopg2.extras.execute_values says values should be Sequence[Sequence]
lambda the_sql: execute_values(self.txn, the_sql, values, fetch=fetch),
lambda the_sql: execute_values(
self.txn, the_sql, values, template=template, fetch=fetch
),
sql,
)

Expand Down
184 changes: 118 additions & 66 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
cast,
)

import attr

try:
# Figure out if ICU support is available for searching users.
import icu
Expand Down Expand Up @@ -66,6 +68,17 @@
TEMP_TABLE = "_temp_populate_user_directory"


@attr.s(auto_attribs=True, frozen=True)
class _UserDirProfile:
"""Helper type for the user directory code for an entry to be inserted into
the directory.
"""

user_id: str
display_name: Optional[str] = None
avatar_url: Optional[str] = None


class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
# How many records do we calculate before sending it to
# add_users_who_share_private_rooms?
Expand Down Expand Up @@ -388,18 +401,23 @@ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:
user_id, profile.display_name, profile.avatar_url
)

# We've finished processing a user. Delete it from the table.
await self.db_pool.simple_delete_one(
TEMP_TABLE + "_users", {"user_id": user_id}
)
# Update the remaining counter.
progress["remaining"] -= 1
await self.db_pool.runInteraction(
"populate_user_directory",
self.db_pool.updates._background_update_progress_txn,
"populate_user_directory_process_users",
progress,
)
# We've finished processing the users. Delete it from the table.
await self.db_pool.simple_delete_many(
table=TEMP_TABLE + "_users",
column="user_id",
iterable=users_to_work_on,
keyvalues={},
desc="populate_user_directory_process_users_delete",
)

# Update the remaining counter.
progress["remaining"] -= len(users_to_work_on)
await self.db_pool.runInteraction(
"populate_user_directory",
self.db_pool.updates._background_update_progress_txn,
"populate_user_directory_process_users",
progress,
)

return len(users_to_work_on)

Expand Down Expand Up @@ -588,68 +606,102 @@ async def update_profile_in_user_dir(
display_name = non_null_str_or_none(display_name)
avatar_url = non_null_str_or_none(avatar_url)

def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None:
self.db_pool.simple_upsert_txn(
await self.db_pool.runInteraction(
"update_profile_in_user_dir",
self._update_profile_in_user_dir_txn,
[_UserDirProfile(user_id, display_name, avatar_url)],
)

def _update_profile_in_user_dir_txn(
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
self,
txn: LoggingTransaction,
profiles: Sequence[_UserDirProfile],
) -> None:
self.db_pool.simple_upsert_many_txn(
txn,
table="user_directory",
key_names=("user_id",),
key_values=[(p.user_id,) for p in profiles],
value_names=("display_name", "avatar_url"),
value_values=[
(
non_null_str_or_none(p.display_name),
non_null_str_or_none(p.avatar_url),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see the equivalent of these somewhere in the single-profile version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's happening in update_profile_in_user_dir. I realise that actually we should a) only do it once, and b) we need to apply it to all places we use disaply_name, so I've moved it into the attrs class

)
for p in profiles
],
)

# Remote users: Make sure the profile is not marked as stale anymore.
remote_users = [
p.user_id for p in profiles if not self.hs.is_mine_id(p.user_id)
]
if remote_users:
self.db_pool.simple_delete_many_txn(
txn,
table="user_directory",
keyvalues={"user_id": user_id},
values={"display_name": display_name, "avatar_url": avatar_url},
table="user_directory_stale_remote_users",
column="user_id",
values=remote_users,
keyvalues={},
)

if not self.hs.is_mine_id(user_id):
# Remote users: Make sure the profile is not marked as stale anymore.
self.db_pool.simple_delete_txn(
txn,
table="user_directory_stale_remote_users",
keyvalues={"user_id": user_id},
if isinstance(self.database_engine, PostgresEngine):
# We weight the localpart most highly, then display name and finally
# server name
template = """
(
%s,
setweight(to_tsvector('simple', %s), 'A')
|| setweight(to_tsvector('simple', %s), 'D')
|| setweight(to_tsvector('simple', COALESCE(%s, '')), 'B')
)
"""

# The display name that goes into the database index.
index_display_name = display_name
if index_display_name is not None:
index_display_name = _filter_text_for_index(index_display_name)

if isinstance(self.database_engine, PostgresEngine):
# We weight the localpart most highly, then display name and finally
# server name
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES (?,
setweight(to_tsvector('simple', ?), 'A')
|| setweight(to_tsvector('simple', ?), 'D')
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
txn.execute(
sql,
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES ? ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
txn.execute_values(
sql,
[
(
user_id,
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
index_display_name,
),
)
elif isinstance(self.database_engine, Sqlite3Engine):
value = (
"%s %s" % (user_id, index_display_name)
if index_display_name
else user_id
)
self.db_pool.simple_upsert_txn(
txn,
table="user_directory_search",
keyvalues={"user_id": user_id},
values={"value": value},
)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
p.user_id,
get_localpart_from_id(p.user_id),
get_domain_from_id(p.user_id),
_filter_text_for_index(p.display_name)
if p.display_name
else None,
)
for p in profiles
],
template=template,
fetch=False,
)
elif isinstance(self.database_engine, Sqlite3Engine):
values = []
for p in profiles:
if p.display_name is not None:
index_display_name = _filter_text_for_index(p.display_name)
value = f"{p.user_id} {index_display_name}"
else:
value = p.user_id

txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
values.append((value,))

await self.db_pool.runInteraction(
"update_profile_in_user_dir", _update_profile_in_user_dir_txn
)
self.db_pool.simple_upsert_many_txn(
txn,
table="user_directory_search",
key_names=("user_id",),
key_values=[(p.user_id,) for p in profiles],
value_names=("value",),
value_values=values,
)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")

for p in profiles:
txn.call_after(self.get_user_in_directory.invalidate, (p.user_id,))

async def add_users_who_share_private_room(
self, room_id: str, user_id_tuples: Iterable[Tuple[str, str]]
Expand Down