Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve the performance of calculating ignored users in large rooms #9024

Merged
merged 15 commits into from
Jan 7, 2021
Merged
1 change: 1 addition & 0 deletions changelog.d/9024.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improved performance when calculating ignored users in large rooms.
12 changes: 8 additions & 4 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,18 @@ async def action_for_event_by_user(

condition_cache = {} # type: Dict[str, bool]

# If the event is not a state event check if any users ignore the sender.
if not event.is_state():
ignorers = await self.store.ignored_by(event.sender)
else:
ignorers = set()

for uid, rules in rules_by_user.items():
if event.sender == uid:
continue

if not event.is_state():
is_ignored = await self.store.is_ignored_by(event.sender, uid)
if is_ignored:
continue
if uid in ignorers:
continue

display_name = None
profile_info = room_members.get(uid)
Expand Down
119 changes: 90 additions & 29 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

import abc
import logging
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Set, Tuple

from synapse.api.constants import AccountDataTypes
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.caches.descriptors import cached
from synapse.util.caches.stream_change_cache import StreamChangeCache

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -287,23 +287,24 @@ def get_updated_account_data_for_user_txn(txn):
"get_updated_account_data_for_user", get_updated_account_data_for_user_txn
)

@cached(num_args=2, cache_context=True, max_entries=5000)
async def is_ignored_by(
self, ignored_user_id: str, ignorer_user_id: str, cache_context: _CacheContext
) -> bool:
ignored_account_data = await self.get_global_account_data_by_type_for_user(
AccountDataTypes.IGNORED_USER_LIST,
ignorer_user_id,
on_invalidate=cache_context.invalidate,
)
if not ignored_account_data:
return False
@cached(max_entries=5000)
clokep marked this conversation as resolved.
Show resolved Hide resolved
async def ignored_by(self, user_id: str) -> Set[str]:
"""
Get users which ignore the given user.

try:
return ignored_user_id in ignored_account_data.get("ignored_users", {})
except TypeError:
# The type of the ignored_users field is invalid.
return False
Params:
user_id: The user ID which might be ignored.

Return:
The user IDs which ignore the given user.
"""
return set(
await self.db_pool.simple_select_one_onecol(
clokep marked this conversation as resolved.
Show resolved Hide resolved
table="ignored_users",
keyvalues={"ignored_user_id": user_id},
retcol="ignorer_user_id",
clokep marked this conversation as resolved.
Show resolved Hide resolved
)
)


class AccountDataStore(AccountDataWorkerStore):
Expand Down Expand Up @@ -390,18 +391,14 @@ async def add_account_data_for_user(
Returns:
The maximum stream ID.
"""
content_json = json_encoder.encode(content)

async with self._account_data_id_gen.get_next() as next_id:
# no need to lock here as account_data has a unique constraint on
# (user_id, account_data_type) so simple_upsert will retry if
# there is a conflict.
await self.db_pool.simple_upsert(
desc="add_user_account_data",
table="account_data",
keyvalues={"user_id": user_id, "account_data_type": account_data_type},
values={"stream_id": next_id, "content": content_json},
lock=False,
await self.db_pool.runInteraction(
"add_user_account_data",
self._add_account_data_for_user,
next_id,
user_id,
account_data_type,
content,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI changing this from an upsert has non-obvious performance cost: simple_upsert will be run in autocommit mode which a) involves fewer round trips (as no BEGIN/COMMIT commands) and b) means it doesn't get run in REPEATABLE READ isolation level, which has overhead and more likely to conflict with other transactions. I don't think it matters in this case though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still doing a simple_upsert_txn internally, does that make sense or not really? Hopefully this won't conflict much since it would need to be a user conflicting with themself?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's all correct currently, I just wanted to flag that calling simple_upsert is actually not the same as calling runInteraction and simple_upsert_txn from a perf PoV. I don't think we need to worry about it here though, as we're unlikely to see conflicts (which is where it really matters).

)

# it's theoretically possible for the above to succeed and the
Expand All @@ -424,6 +421,70 @@ async def add_account_data_for_user(

return self._account_data_id_gen.get_current_token()

def _add_account_data_for_user(
self,
txn,
next_id: int,
user_id: str,
account_data_type: str,
content: JsonDict,
) -> None:
content_json = json_encoder.encode(content)

# no need to lock here as account_data has a unique constraint on
# (user_id, account_data_type) so simple_upsert will retry if
# there is a conflict.
self.db_pool.simple_upsert_txn(
txn,
table="account_data",
keyvalues={"user_id": user_id, "account_data_type": account_data_type},
values={"stream_id": next_id, "content": content_json},
lock=False,
)

# Ignored users get denormalized into a separate table as an optimisation.
if account_data_type == AccountDataTypes.IGNORED_USER_LIST:
clokep marked this conversation as resolved.
Show resolved Hide resolved
# Insert / delete to sync the list of ignored users.
previously_ignored_users = set(
self.db_pool.simple_select_onecol_txn(
txn,
table="ignored_users",
keyvalues={"ignorer_user_id": user_id},
retcol="ignored_user_id",
)
)

# If the data is invalid, no one is ignored.
ignored_users_content = content.get("ignored_users", {})
if isinstance(ignored_users_content, dict):
currently_ignored_users = set(ignored_users_content)
else:
currently_ignored_users = set()
newly_ignored_users = currently_ignored_users - previously_ignored_users

# Delete entries which are no longer ignored.
self.db_pool.simple_delete_many_txn(
txn,
table="ignored_users",
column="ignored_user_id",
iterable=previously_ignored_users - currently_ignored_users,
keyvalues={"ignorer_user_id": user_id},
)

# Add entries which are newly ignored.
self.db_pool.simple_insert_many_txn(
txn,
table="ignored_users",
values=[
{"ignorer_user_id": user_id, "ignored_user_id": u}
for u in newly_ignored_users
],
)

# Invalidate the cache for newly ignored users.
for ignored_user_id in newly_ignored_users:
self.ignored_by.invalidate((ignored_user_id,))
clokep marked this conversation as resolved.
Show resolved Hide resolved

async def _update_max_stream_id(self, next_id: int) -> None:
"""Update the max stream_id

Expand Down
67 changes: 67 additions & 0 deletions synapse/storage/databases/main/schema/delta/58/28ignored_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This migration denormalises the account_data table into an ignored users table.
"""

import logging
from io import StringIO

from synapse.storage._base import db_to_json
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.prepare_database import execute_statements_from_stream
from synapse.storage.types import Cursor

logger = logging.getLogger(__name__)


def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
insert_sql = """
INSERT INTO ignored_users (ignorer_user_id, ignored_user_id) VALUES (?, ?)
"""

logger.info("Converting existing ignore lists")
cur.execute(
"SELECT user_id, content FROM account_data WHERE account_data_type = 'm.ignored_user_list'"
)
for user_id, content_json in cur.fetchall():
content = db_to_json(content_json)

# The content should be the form of a dictionary with a key
# "ignored_users" pointing to a dictionary with keys of ignored users.
#
# { "ignored_users": "@someone:example.org": {} }
ignored_users = content.get("ignored_users", {})
if isinstance(ignored_users, dict) and ignored_users:
cur.executemany(insert_sql, [(user_id, u) for u in ignored_users])


def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
logger.info("Creating ignored_users table")
execute_statements_from_stream(cur, StringIO(_create_commands))


# there might be duplicates, so the easiest way to achieve this is to create a new
# table with the right data, and renaming it into place

_create_commands = """
-- Users which are ignored when calculating push notifications. This data is
-- denormalized from account data.
CREATE TABLE IF NOT EXISTS ignored_users(
ignorer_user_id TEXT NOT NULL, -- The user ID of the user who is ignoring another user. (This is a local user.)
ignored_user_id TEXT NOT NULL, -- The user ID of the user who is being ignored. (This is a local or remote user.)
UNIQUE (ignorer_user_id, ignored_user_id)
);
"""
clokep marked this conversation as resolved.
Show resolved Hide resolved