From 1b4d5d6acf8cfbe65601b881360ea730f9693d80 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 6 Jan 2021 12:33:20 -0500 Subject: [PATCH 01/26] Empty iterables should count towards cache usage. (#9028) --- changelog.d/9028.bugfix | 1 + synapse/util/caches/deferred_cache.py | 2 +- tests/util/caches/test_deferred_cache.py | 73 ++++++++++++++++-------- 3 files changed, 52 insertions(+), 24 deletions(-) create mode 100644 changelog.d/9028.bugfix diff --git a/changelog.d/9028.bugfix b/changelog.d/9028.bugfix new file mode 100644 index 000000000000..66666886a469 --- /dev/null +++ b/changelog.d/9028.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where some caches could grow larger than configured. diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index 601305487c55..1adc92eb905f 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -105,7 +105,7 @@ def metrics_cb(): keylen=keylen, cache_name=name, cache_type=cache_type, - size_callback=(lambda d: len(d)) if iterable else None, + size_callback=(lambda d: len(d) or 1) if iterable else None, metrics_collection_callback=metrics_cb, apply_cache_factor_from_config=apply_cache_factor_from_config, ) # type: LruCache[KT, VT] diff --git a/tests/util/caches/test_deferred_cache.py b/tests/util/caches/test_deferred_cache.py index dadfabd46d1d..ecd9efc4dfc3 100644 --- a/tests/util/caches/test_deferred_cache.py +++ b/tests/util/caches/test_deferred_cache.py @@ -25,13 +25,8 @@ class DeferredCacheTestCase(TestCase): def test_empty(self): cache = DeferredCache("test") - failed = False - try: + with self.assertRaises(KeyError): cache.get("foo") - except KeyError: - failed = True - - self.assertTrue(failed) def test_hit(self): cache = DeferredCache("test") @@ -155,13 +150,8 @@ def test_invalidate(self): cache.prefill(("foo",), 123) cache.invalidate(("foo",)) - failed = False - try: + with self.assertRaises(KeyError): cache.get(("foo",)) - except KeyError: - failed = True - - self.assertTrue(failed) def test_invalidate_all(self): cache = DeferredCache("testcache") @@ -215,13 +205,8 @@ def test_eviction(self): cache.prefill(2, "two") cache.prefill(3, "three") # 1 will be evicted - failed = False - try: + with self.assertRaises(KeyError): cache.get(1) - except KeyError: - failed = True - - self.assertTrue(failed) cache.get(2) cache.get(3) @@ -239,13 +224,55 @@ def test_eviction_lru(self): cache.prefill(3, "three") - failed = False - try: + with self.assertRaises(KeyError): cache.get(2) - except KeyError: - failed = True - self.assertTrue(failed) + cache.get(1) + cache.get(3) + + def test_eviction_iterable(self): + cache = DeferredCache( + "test", max_entries=3, apply_cache_factor_from_config=False, iterable=True, + ) + + cache.prefill(1, ["one", "two"]) + cache.prefill(2, ["three"]) + # Now access 1 again, thus causing 2 to be least-recently used + cache.get(1) + + # Now add an item to the cache, which evicts 2. + cache.prefill(3, ["four"]) + with self.assertRaises(KeyError): + cache.get(2) + + # Ensure 1 & 3 are in the cache. cache.get(1) cache.get(3) + + # Now access 1 again, thus causing 3 to be least-recently used + cache.get(1) + + # Now add an item with multiple elements to the cache + cache.prefill(4, ["five", "six"]) + + # Both 1 and 3 are evicted since there's too many elements. + with self.assertRaises(KeyError): + cache.get(1) + with self.assertRaises(KeyError): + cache.get(3) + + # Now add another item to fill the cache again. + cache.prefill(5, ["seven"]) + + # Now access 4, thus causing 5 to be least-recently used + cache.get(4) + + # Add an empty item. + cache.prefill(6, []) + + # 5 gets evicted and replaced since an empty element counts as an item. + with self.assertRaises(KeyError): + cache.get(5) + cache.get(4) + cache.get(6) From eee3c3c52f59661a95f2f626edc97fac295817d6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Jan 2021 11:33:36 +0000 Subject: [PATCH 02/26] Handle updating schema version without any deltas. (#9033) This can happen when using a split out state database and we've upgraded the schema version without there being any changes in the state schema. --- changelog.d/9033.misc | 1 + synapse/storage/prepare_database.py | 17 ++++++++++------- 2 files changed, 11 insertions(+), 7 deletions(-) create mode 100644 changelog.d/9033.misc diff --git a/changelog.d/9033.misc b/changelog.d/9033.misc new file mode 100644 index 000000000000..e9a305c0e8ea --- /dev/null +++ b/changelog.d/9033.misc @@ -0,0 +1 @@ +Allow bumping schema version when using split out state database. diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index f91a2eae7aa4..6684403a0a77 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -375,7 +375,16 @@ def _upgrade_existing_database( specific_engine_extensions = (".sqlite", ".postgres") for v in range(start_ver, SCHEMA_VERSION + 1): - logger.info("Applying schema deltas for v%d", v) + if not is_worker: + logger.info("Applying schema deltas for v%d", v) + + cur.execute("DELETE FROM schema_version") + cur.execute( + "INSERT INTO schema_version (version, upgraded) VALUES (?,?)", + (v, True), + ) + else: + logger.info("Checking schema deltas for v%d", v) # We need to search both the global and per data store schema # directories for schema updates. @@ -489,12 +498,6 @@ def _upgrade_existing_database( (v, relative_path), ) - cur.execute("DELETE FROM schema_version") - cur.execute( - "INSERT INTO schema_version (version, upgraded) VALUES (?,?)", - (v, True), - ) - logger.info("Schema now up to date") From 8d3d264052adffaf9ef36a6d9235aeeb8bef5fb5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 7 Jan 2021 11:41:28 +0000 Subject: [PATCH 03/26] Skip unit tests which require optional dependencies (#9031) If we are lacking an optional dependency, skip the tests that rely on it. --- changelog.d/9031.misc | 1 + tests/handlers/test_oidc.py | 19 ++++++++++++++++- tests/rest/client/v1/test_login.py | 11 +++++++++- tests/rest/client/v2_alpha/test_auth.py | 26 ++++++++++++++--------- tests/rest/media/v1/test_url_preview.py | 7 +++++++ tests/test_preview.py | 11 ++++++++++ tests/unittest.py | 28 ++++++++++++++++++++++++- 7 files changed, 90 insertions(+), 13 deletions(-) create mode 100644 changelog.d/9031.misc diff --git a/changelog.d/9031.misc b/changelog.d/9031.misc new file mode 100644 index 000000000000..f43611c38598 --- /dev/null +++ b/changelog.d/9031.misc @@ -0,0 +1 @@ +Fix running unit tests when optional dependencies are not installed. diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py index 368d600b33f2..f5df65781433 100644 --- a/tests/handlers/test_oidc.py +++ b/tests/handlers/test_oidc.py @@ -24,7 +24,6 @@ from twisted.web.resource import Resource from synapse.api.errors import RedirectException -from synapse.handlers.oidc_handler import OidcError from synapse.handlers.sso import MappingException from synapse.rest.client.v1 import login from synapse.rest.synapse.client.pick_username import pick_username_resource @@ -34,6 +33,14 @@ from tests.test_utils import FakeResponse, simple_async_mock from tests.unittest import HomeserverTestCase, override_config +try: + import authlib # noqa: F401 + + HAS_OIDC = True +except ImportError: + HAS_OIDC = False + + # These are a few constants that are used as config parameters in the tests. ISSUER = "https://issuer/" CLIENT_ID = "test-client-id" @@ -113,6 +120,9 @@ async def get_json(url): class OidcHandlerTestCase(HomeserverTestCase): + if not HAS_OIDC: + skip = "requires OIDC" + def default_config(self): config = super().default_config() config["public_baseurl"] = BASE_URL @@ -458,6 +468,8 @@ def test_callback(self): self.assertRenderedError("fetch_error") # Handle code exchange failure + from synapse.handlers.oidc_handler import OidcError + self.handler._exchange_code = simple_async_mock( raises=OidcError("invalid_request") ) @@ -538,6 +550,8 @@ def test_exchange_code(self): body=b'{"error": "foo", "error_description": "bar"}', ) ) + from synapse.handlers.oidc_handler import OidcError + exc = self.get_failure(self.handler._exchange_code(code), OidcError) self.assertEqual(exc.value.error, "foo") self.assertEqual(exc.value.error_description, "bar") @@ -829,6 +843,9 @@ def test_null_localpart(self): class UsernamePickerTestCase(HomeserverTestCase): + if not HAS_OIDC: + skip = "requires OIDC" + servlets = [login.register_servlets] def default_config(self): diff --git a/tests/rest/client/v1/test_login.py b/tests/rest/client/v1/test_login.py index 999d62831578..901c72d36a10 100644 --- a/tests/rest/client/v1/test_login.py +++ b/tests/rest/client/v1/test_login.py @@ -4,7 +4,10 @@ from mock import Mock -import jwt +try: + import jwt +except ImportError: + jwt = None import synapse.rest.admin from synapse.appservice import ApplicationService @@ -460,6 +463,9 @@ def test_deactivated_user(self): class JWTTestCase(unittest.HomeserverTestCase): + if not jwt: + skip = "requires jwt" + servlets = [ synapse.rest.admin.register_servlets_for_client_rest_resource, login.register_servlets, @@ -628,6 +634,9 @@ def test_login_no_token(self): # RSS256, with a public key configured in synapse as "jwt_secret", and tokens # signed by the private key. class JWTPubKeyTestCase(unittest.HomeserverTestCase): + if not jwt: + skip = "requires jwt" + servlets = [ login.register_servlets, ] diff --git a/tests/rest/client/v2_alpha/test_auth.py b/tests/rest/client/v2_alpha/test_auth.py index ac66a4e0b71c..bb91e0c33143 100644 --- a/tests/rest/client/v2_alpha/test_auth.py +++ b/tests/rest/client/v2_alpha/test_auth.py @@ -26,8 +26,10 @@ from synapse.types import JsonDict, UserID from tests import unittest +from tests.handlers.test_oidc import HAS_OIDC from tests.rest.client.v1.utils import TEST_OIDC_CONFIG from tests.server import FakeChannel +from tests.unittest import override_config, skip_unless class DummyRecaptchaChecker(UserInteractiveAuthChecker): @@ -158,20 +160,22 @@ class UIAuthTests(unittest.HomeserverTestCase): def default_config(self): config = super().default_config() + config["public_baseurl"] = "https://synapse.test" - # we enable OIDC as a way of testing SSO flows - oidc_config = {} - oidc_config.update(TEST_OIDC_CONFIG) - oidc_config["allow_existing_users"] = True + if HAS_OIDC: + # we enable OIDC as a way of testing SSO flows + oidc_config = {} + oidc_config.update(TEST_OIDC_CONFIG) + oidc_config["allow_existing_users"] = True + config["oidc_config"] = oidc_config - config["oidc_config"] = oidc_config - config["public_baseurl"] = "https://synapse.test" return config def create_resource_dict(self): resource_dict = super().create_resource_dict() - # mount the OIDC resource at /_synapse/oidc - resource_dict["/_synapse/oidc"] = OIDCResource(self.hs) + if HAS_OIDC: + # mount the OIDC resource at /_synapse/oidc + resource_dict["/_synapse/oidc"] = OIDCResource(self.hs) return resource_dict def prepare(self, reactor, clock, hs): @@ -380,6 +384,8 @@ def test_can_reuse_session(self): # Note that *no auth* information is provided, not even a session iD! self.delete_device(self.user_tok, self.device_id, 200) + @skip_unless(HAS_OIDC, "requires OIDC") + @override_config({"oidc_config": TEST_OIDC_CONFIG}) def test_does_not_offer_password_for_sso_user(self): login_resp = self.helper.login_via_oidc("username") user_tok = login_resp["access_token"] @@ -393,13 +399,13 @@ def test_does_not_offer_password_for_sso_user(self): self.assertEqual(flows, [{"stages": ["m.login.sso"]}]) def test_does_not_offer_sso_for_password_user(self): - # now call the device deletion API: we should get the option to auth with SSO - # and not password. channel = self.delete_device(self.user_tok, self.device_id, 401) flows = channel.json_body["flows"] self.assertEqual(flows, [{"stages": ["m.login.password"]}]) + @skip_unless(HAS_OIDC, "requires OIDC") + @override_config({"oidc_config": TEST_OIDC_CONFIG}) def test_offers_both_flows_for_upgraded_user(self): """A user that had a password and then logged in with SSO should get both flows """ diff --git a/tests/rest/media/v1/test_url_preview.py b/tests/rest/media/v1/test_url_preview.py index 83d728b4a45b..696850243391 100644 --- a/tests/rest/media/v1/test_url_preview.py +++ b/tests/rest/media/v1/test_url_preview.py @@ -26,8 +26,15 @@ from tests import unittest from tests.server import FakeTransport +try: + import lxml +except ImportError: + lxml = None + class URLPreviewTests(unittest.HomeserverTestCase): + if not lxml: + skip = "url preview feature requires lxml" hijack_auth = True user_id = "@test:user" diff --git a/tests/test_preview.py b/tests/test_preview.py index a883d707df28..c19facc1cb70 100644 --- a/tests/test_preview.py +++ b/tests/test_preview.py @@ -20,8 +20,16 @@ from . import unittest +try: + import lxml +except ImportError: + lxml = None + class PreviewTestCase(unittest.TestCase): + if not lxml: + skip = "url preview feature requires lxml" + def test_long_summarize(self): example_paras = [ """Tromsø (Norwegian pronunciation: [ˈtrʊmsœ] ( listen); Northern Sami: @@ -137,6 +145,9 @@ def test_small_then_large_summarize(self): class PreviewUrlTestCase(unittest.TestCase): + if not lxml: + skip = "url preview feature requires lxml" + def test_simple(self): html = """ diff --git a/tests/unittest.py b/tests/unittest.py index af7f752c5a69..bbd295687c91 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -20,7 +20,7 @@ import inspect import logging import time -from typing import Dict, Iterable, Optional, Tuple, Type, TypeVar, Union +from typing import Callable, Dict, Iterable, Optional, Tuple, Type, TypeVar, Union from mock import Mock, patch @@ -736,3 +736,29 @@ def decorator(func): return func return decorator + + +TV = TypeVar("TV") + + +def skip_unless(condition: bool, reason: str) -> Callable[[TV], TV]: + """A test decorator which will skip the decorated test unless a condition is set + + For example: + + class MyTestCase(TestCase): + @skip_unless(HAS_FOO, "Cannot test without foo") + def test_foo(self): + ... + + Args: + condition: If true, the test will be skipped + reason: the reason to give for skipping the test + """ + + def decorator(f: TV) -> TV: + if not condition: + f.skip = reason # type: ignore + return f + + return decorator From 1d5c021a45465f2926f70bddfd64b79bc018a7da Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 7 Jan 2021 11:41:54 +0000 Subject: [PATCH 04/26] tox: Add a -noextras factor (#9030) ... for running the tests with no optional deps. --- changelog.d/9030.misc | 1 + tox.ini | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 changelog.d/9030.misc diff --git a/changelog.d/9030.misc b/changelog.d/9030.misc new file mode 100644 index 000000000000..267cfbf9f9d4 --- /dev/null +++ b/changelog.d/9030.misc @@ -0,0 +1 @@ +Add a `-noextras` factor to `tox.ini`, to support running the tests with no optional dependencies. diff --git a/tox.ini b/tox.ini index 8e8b49529246..ab4ae295a96f 100644 --- a/tox.ini +++ b/tox.ini @@ -2,7 +2,6 @@ envlist = packaging, py35, py36, py37, py38, py39, check_codestyle, check_isort [base] -extras = test deps = python-subunit junitxml @@ -25,10 +24,15 @@ deps = # install the "enum34" dependency of cryptography. pip>=10 +# default settings for all tox environments [testenv] deps = {[base]deps} -extras = all, test +extras = + # install the optional dependendencies for tox environments without + # '-noextras' in their name + !noextras: all + test setenv = # use a postgres db for tox environments with "-postgres" in the name From 3fc2399dbe0fa141e3bac4b45686eceb9f8d086e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 7 Jan 2021 12:16:23 +0000 Subject: [PATCH 05/26] black-format tests/rest/client/v1/test_login.py black seems to want to reformat this, despite `black --check` being happy with it :/ --- tests/rest/client/v1/test_login.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/tests/rest/client/v1/test_login.py b/tests/rest/client/v1/test_login.py index 901c72d36a10..3b82511d97b5 100644 --- a/tests/rest/client/v1/test_login.py +++ b/tests/rest/client/v1/test_login.py @@ -413,8 +413,7 @@ def test_cas_redirect_confirm(self): } ) def test_cas_redirect_whitelisted(self): - """Tests that the SSO login flow serves a redirect to a whitelisted url - """ + """Tests that the SSO login flow serves a redirect to a whitelisted url""" self._test_redirect("https://legit-site.com/") @override_config({"public_baseurl": "https://example.com"}) @@ -773,8 +772,7 @@ def make_homeserver(self, reactor, clock): return self.hs def test_login_appservice_user(self): - """Test that an appservice user can use /login - """ + """Test that an appservice user can use /login""" self.register_as_user(AS_USER) params = { @@ -788,8 +786,7 @@ def test_login_appservice_user(self): self.assertEquals(channel.result["code"], b"200", channel.result) def test_login_appservice_user_bot(self): - """Test that the appservice bot can use /login - """ + """Test that the appservice bot can use /login""" self.register_as_user(AS_USER) params = { @@ -803,8 +800,7 @@ def test_login_appservice_user_bot(self): self.assertEquals(channel.result["code"], b"200", channel.result) def test_login_appservice_wrong_user(self): - """Test that non-as users cannot login with the as token - """ + """Test that non-as users cannot login with the as token""" self.register_as_user(AS_USER) params = { @@ -818,8 +814,7 @@ def test_login_appservice_wrong_user(self): self.assertEquals(channel.result["code"], b"403", channel.result) def test_login_appservice_wrong_as(self): - """Test that as users cannot login with wrong as token - """ + """Test that as users cannot login with wrong as token""" self.register_as_user(AS_USER) params = { @@ -834,7 +829,7 @@ def test_login_appservice_wrong_as(self): def test_login_appservice_no_token(self): """Test that users must provide a token when using the appservice - login method + login method """ self.register_as_user(AS_USER) From 23d701864fedbc40863b34c9c46c134295dd0a35 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 7 Jan 2021 08:03:38 -0500 Subject: [PATCH 06/26] Improve the performance of calculating ignored users in large rooms (#9024) This allows for efficiently finding which users ignore a particular user. Co-authored-by: Erik Johnston --- changelog.d/9024.feature | 1 + synapse/push/bulk_push_rule_evaluator.py | 12 +- .../storage/databases/main/account_data.py | 121 +++++++++++++----- .../main/schema/delta/59/01ignored_user.py | 82 ++++++++++++ synapse/storage/prepare_database.py | 2 +- tests/storage/test_account_data.py | 120 +++++++++++++++++ 6 files changed, 304 insertions(+), 34 deletions(-) create mode 100644 changelog.d/9024.feature create mode 100644 synapse/storage/databases/main/schema/delta/59/01ignored_user.py create mode 100644 tests/storage/test_account_data.py diff --git a/changelog.d/9024.feature b/changelog.d/9024.feature new file mode 100644 index 000000000000..073dafbf834e --- /dev/null +++ b/changelog.d/9024.feature @@ -0,0 +1 @@ +Improved performance when calculating ignored users in large rooms. diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 10f27e4378d7..9018f9e20bd2 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -203,14 +203,18 @@ async def action_for_event_by_user( condition_cache = {} # type: Dict[str, bool] + # If the event is not a state event check if any users ignore the sender. + if not event.is_state(): + ignorers = await self.store.ignored_by(event.sender) + else: + ignorers = set() + for uid, rules in rules_by_user.items(): if event.sender == uid: continue - if not event.is_state(): - is_ignored = await self.store.is_ignored_by(event.sender, uid) - if is_ignored: - continue + if uid in ignorers: + continue display_name = None profile_info = room_members.get(uid) diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 49ee23470d61..bff51e92b9d1 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -16,7 +16,7 @@ import abc import logging -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Set, Tuple from synapse.api.constants import AccountDataTypes from synapse.storage._base import SQLBaseStore, db_to_json @@ -24,7 +24,7 @@ from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import JsonDict from synapse.util import json_encoder -from synapse.util.caches.descriptors import _CacheContext, cached +from synapse.util.caches.descriptors import cached from synapse.util.caches.stream_change_cache import StreamChangeCache logger = logging.getLogger(__name__) @@ -287,23 +287,25 @@ def get_updated_account_data_for_user_txn(txn): "get_updated_account_data_for_user", get_updated_account_data_for_user_txn ) - @cached(num_args=2, cache_context=True, max_entries=5000) - async def is_ignored_by( - self, ignored_user_id: str, ignorer_user_id: str, cache_context: _CacheContext - ) -> bool: - ignored_account_data = await self.get_global_account_data_by_type_for_user( - AccountDataTypes.IGNORED_USER_LIST, - ignorer_user_id, - on_invalidate=cache_context.invalidate, - ) - if not ignored_account_data: - return False + @cached(max_entries=5000, iterable=True) + async def ignored_by(self, user_id: str) -> Set[str]: + """ + Get users which ignore the given user. - try: - return ignored_user_id in ignored_account_data.get("ignored_users", {}) - except TypeError: - # The type of the ignored_users field is invalid. - return False + Params: + user_id: The user ID which might be ignored. + + Return: + The user IDs which ignore the given user. + """ + return set( + await self.db_pool.simple_select_onecol( + table="ignored_users", + keyvalues={"ignored_user_id": user_id}, + retcol="ignorer_user_id", + desc="ignored_by", + ) + ) class AccountDataStore(AccountDataWorkerStore): @@ -390,18 +392,14 @@ async def add_account_data_for_user( Returns: The maximum stream ID. """ - content_json = json_encoder.encode(content) - async with self._account_data_id_gen.get_next() as next_id: - # no need to lock here as account_data has a unique constraint on - # (user_id, account_data_type) so simple_upsert will retry if - # there is a conflict. - await self.db_pool.simple_upsert( - desc="add_user_account_data", - table="account_data", - keyvalues={"user_id": user_id, "account_data_type": account_data_type}, - values={"stream_id": next_id, "content": content_json}, - lock=False, + await self.db_pool.runInteraction( + "add_user_account_data", + self._add_account_data_for_user, + next_id, + user_id, + account_data_type, + content, ) # it's theoretically possible for the above to succeed and the @@ -424,6 +422,71 @@ async def add_account_data_for_user( return self._account_data_id_gen.get_current_token() + def _add_account_data_for_user( + self, + txn, + next_id: int, + user_id: str, + account_data_type: str, + content: JsonDict, + ) -> None: + content_json = json_encoder.encode(content) + + # no need to lock here as account_data has a unique constraint on + # (user_id, account_data_type) so simple_upsert will retry if + # there is a conflict. + self.db_pool.simple_upsert_txn( + txn, + table="account_data", + keyvalues={"user_id": user_id, "account_data_type": account_data_type}, + values={"stream_id": next_id, "content": content_json}, + lock=False, + ) + + # Ignored users get denormalized into a separate table as an optimisation. + if account_data_type != AccountDataTypes.IGNORED_USER_LIST: + return + + # Insert / delete to sync the list of ignored users. + previously_ignored_users = set( + self.db_pool.simple_select_onecol_txn( + txn, + table="ignored_users", + keyvalues={"ignorer_user_id": user_id}, + retcol="ignored_user_id", + ) + ) + + # If the data is invalid, no one is ignored. + ignored_users_content = content.get("ignored_users", {}) + if isinstance(ignored_users_content, dict): + currently_ignored_users = set(ignored_users_content) + else: + currently_ignored_users = set() + + # Delete entries which are no longer ignored. + self.db_pool.simple_delete_many_txn( + txn, + table="ignored_users", + column="ignored_user_id", + iterable=previously_ignored_users - currently_ignored_users, + keyvalues={"ignorer_user_id": user_id}, + ) + + # Add entries which are newly ignored. + self.db_pool.simple_insert_many_txn( + txn, + table="ignored_users", + values=[ + {"ignorer_user_id": user_id, "ignored_user_id": u} + for u in currently_ignored_users - previously_ignored_users + ], + ) + + # Invalidate the cache for any ignored users which were added or removed. + for ignored_user_id in previously_ignored_users ^ currently_ignored_users: + self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,)) + async def _update_max_stream_id(self, next_id: int) -> None: """Update the max stream_id diff --git a/synapse/storage/databases/main/schema/delta/59/01ignored_user.py b/synapse/storage/databases/main/schema/delta/59/01ignored_user.py new file mode 100644 index 000000000000..f35c70b69908 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/01ignored_user.py @@ -0,0 +1,82 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This migration denormalises the account_data table into an ignored users table. +""" + +import logging +from io import StringIO + +from synapse.storage._base import db_to_json +from synapse.storage.engines import BaseDatabaseEngine +from synapse.storage.prepare_database import execute_statements_from_stream +from synapse.storage.types import Cursor + +logger = logging.getLogger(__name__) + + +def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): + pass + + +def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): + logger.info("Creating ignored_users table") + execute_statements_from_stream(cur, StringIO(_create_commands)) + + # We now upgrade existing data, if any. We don't do this in `run_upgrade` as + # we a) want to run these before adding constraints and b) `run_upgrade` is + # not run on empty databases. + insert_sql = """ + INSERT INTO ignored_users (ignorer_user_id, ignored_user_id) VALUES (?, ?) + """ + + logger.info("Converting existing ignore lists") + cur.execute( + "SELECT user_id, content FROM account_data WHERE account_data_type = 'm.ignored_user_list'" + ) + for user_id, content_json in cur.fetchall(): + content = db_to_json(content_json) + + # The content should be the form of a dictionary with a key + # "ignored_users" pointing to a dictionary with keys of ignored users. + # + # { "ignored_users": "@someone:example.org": {} } + ignored_users = content.get("ignored_users", {}) + if isinstance(ignored_users, dict) and ignored_users: + cur.executemany(insert_sql, [(user_id, u) for u in ignored_users]) + + # Add indexes after inserting data for efficiency. + logger.info("Adding constraints to ignored_users table") + execute_statements_from_stream(cur, StringIO(_constraints_commands)) + + +# there might be duplicates, so the easiest way to achieve this is to create a new +# table with the right data, and renaming it into place + +_create_commands = """ +-- Users which are ignored when calculating push notifications. This data is +-- denormalized from account data. +CREATE TABLE IF NOT EXISTS ignored_users( + ignorer_user_id TEXT NOT NULL, -- The user ID of the user who is ignoring another user. (This is a local user.) + ignored_user_id TEXT NOT NULL -- The user ID of the user who is being ignored. (This is a local or remote user.) +); +""" + +_constraints_commands = """ +CREATE UNIQUE INDEX ignored_users_uniqueness ON ignored_users (ignorer_user_id, ignored_user_id); + +-- Add an index on ignored_users since look-ups are done to get all ignorers of an ignored user. +CREATE INDEX ignored_users_ignored_user_id ON ignored_users (ignored_user_id); +""" diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 6684403a0a77..01efb2cabb7f 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -38,7 +38,7 @@ # XXX: If you're about to bump this to 59 (or higher) please create an update # that drops the unused `cache_invalidation_stream` table, as per #7436! # XXX: Also add an update to drop `account_data_max_stream_id` as per #7656! -SCHEMA_VERSION = 58 +SCHEMA_VERSION = 59 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/tests/storage/test_account_data.py b/tests/storage/test_account_data.py new file mode 100644 index 000000000000..673e1fe3e339 --- /dev/null +++ b/tests/storage/test_account_data.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Iterable, Set + +from synapse.api.constants import AccountDataTypes + +from tests import unittest + + +class IgnoredUsersTestCase(unittest.HomeserverTestCase): + def prepare(self, hs, reactor, clock): + self.store = self.hs.get_datastore() + self.user = "@user:test" + + def _update_ignore_list( + self, *ignored_user_ids: Iterable[str], ignorer_user_id: str = None + ) -> None: + """Update the account data to block the given users.""" + if ignorer_user_id is None: + ignorer_user_id = self.user + + self.get_success( + self.store.add_account_data_for_user( + ignorer_user_id, + AccountDataTypes.IGNORED_USER_LIST, + {"ignored_users": {u: {} for u in ignored_user_ids}}, + ) + ) + + def assert_ignorers( + self, ignored_user_id: str, expected_ignorer_user_ids: Set[str] + ) -> None: + self.assertEqual( + self.get_success(self.store.ignored_by(ignored_user_id)), + expected_ignorer_user_ids, + ) + + def test_ignoring_users(self): + """Basic adding/removing of users from the ignore list.""" + self._update_ignore_list("@other:test", "@another:remote") + + # Check a user which no one ignores. + self.assert_ignorers("@user:test", set()) + + # Check a local user which is ignored. + self.assert_ignorers("@other:test", {self.user}) + + # Check a remote user which is ignored. + self.assert_ignorers("@another:remote", {self.user}) + + # Add one user, remove one user, and leave one user. + self._update_ignore_list("@foo:test", "@another:remote") + + # Check the removed user. + self.assert_ignorers("@other:test", set()) + + # Check the added user. + self.assert_ignorers("@foo:test", {self.user}) + + # Check the removed user. + self.assert_ignorers("@another:remote", {self.user}) + + def test_caching(self): + """Ensure that caching works properly between different users.""" + # The first user ignores a user. + self._update_ignore_list("@other:test") + self.assert_ignorers("@other:test", {self.user}) + + # The second user ignores them. + self._update_ignore_list("@other:test", ignorer_user_id="@second:test") + self.assert_ignorers("@other:test", {self.user, "@second:test"}) + + # The first user un-ignores them. + self._update_ignore_list() + self.assert_ignorers("@other:test", {"@second:test"}) + + def test_invalid_data(self): + """Invalid data ends up clearing out the ignored users list.""" + # Add some data and ensure it is there. + self._update_ignore_list("@other:test") + self.assert_ignorers("@other:test", {self.user}) + + # No ignored_users key. + self.get_success( + self.store.add_account_data_for_user( + self.user, AccountDataTypes.IGNORED_USER_LIST, {}, + ) + ) + + # No one ignores the user now. + self.assert_ignorers("@other:test", set()) + + # Add some data and ensure it is there. + self._update_ignore_list("@other:test") + self.assert_ignorers("@other:test", {self.user}) + + # Invalid data. + self.get_success( + self.store.add_account_data_for_user( + self.user, + AccountDataTypes.IGNORED_USER_LIST, + {"ignored_users": "unexpected"}, + ) + ) + + # No one ignores the user now. + self.assert_ignorers("@other:test", set()) From bbd04441edc414c161748e1499961c6a68a460aa Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 6 Jan 2021 15:51:18 +0000 Subject: [PATCH 07/26] Fix type hints in test_login.py --- mypy.ini | 1 + tests/rest/client/v1/test_login.py | 78 +++++++++++++++++++++--------- 2 files changed, 55 insertions(+), 24 deletions(-) diff --git a/mypy.ini b/mypy.ini index 5d15b7bf1cae..b996867121e5 100644 --- a/mypy.ini +++ b/mypy.ini @@ -103,6 +103,7 @@ files = tests/replication, tests/test_utils, tests/handlers/test_password_providers.py, + tests/rest/client/v1/test_login.py, tests/rest/client/v2_alpha/test_auth.py, tests/util/test_stream_change_cache.py diff --git a/tests/rest/client/v1/test_login.py b/tests/rest/client/v1/test_login.py index 3b82511d97b5..b5a2ac23d4cd 100644 --- a/tests/rest/client/v1/test_login.py +++ b/tests/rest/client/v1/test_login.py @@ -1,14 +1,24 @@ -import json +# -*- coding: utf-8 -*- +# Copyright 2019-2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import time import urllib.parse +from typing import Any, Dict, Union from mock import Mock -try: - import jwt -except ImportError: - jwt = None - import synapse.rest.admin from synapse.appservice import ApplicationService from synapse.rest.client.v1 import login, logout @@ -16,7 +26,33 @@ from synapse.rest.client.v2_alpha.account import WhoamiRestServlet from tests import unittest -from tests.unittest import override_config +from tests.unittest import override_config, skip_unless + +try: + import jwt + + HAS_JWT = True +except ImportError: + HAS_JWT = False + + +# public_base_url used in some tests +BASE_URL = "https://synapse/" + +# CAS server used in some tests +CAS_SERVER = "https://fake.test" + +# just enough to tell pysaml2 where to redirect to +SAML_SERVER = "https://test.saml.server/idp/sso" +TEST_SAML_METADATA = """ + + + + + +""" % { + "SAML_SERVER": SAML_SERVER, +} LOGIN_URL = b"/_matrix/client/r0/login" TEST_URL = b"/_matrix/client/r0/account/whoami" @@ -461,10 +497,8 @@ def test_deactivated_user(self): self.assertIn(b"SSO account deactivated", channel.result["body"]) +@skip_unless(HAS_JWT, "requires jwt") class JWTTestCase(unittest.HomeserverTestCase): - if not jwt: - skip = "requires jwt" - servlets = [ synapse.rest.admin.register_servlets_for_client_rest_resource, login.register_servlets, @@ -480,17 +514,17 @@ def make_homeserver(self, reactor, clock): self.hs.config.jwt_algorithm = self.jwt_algorithm return self.hs - def jwt_encode(self, token: str, secret: str = jwt_secret) -> str: + def jwt_encode(self, payload: Dict[str, Any], secret: str = jwt_secret) -> str: # PyJWT 2.0.0 changed the return type of jwt.encode from bytes to str. - result = jwt.encode(token, secret, self.jwt_algorithm) + result = jwt.encode( + payload, secret, self.jwt_algorithm + ) # type: Union[str, bytes] if isinstance(result, bytes): return result.decode("ascii") return result def jwt_login(self, *args): - params = json.dumps( - {"type": "org.matrix.login.jwt", "token": self.jwt_encode(*args)} - ) + params = {"type": "org.matrix.login.jwt", "token": self.jwt_encode(*args)} channel = self.make_request(b"POST", LOGIN_URL, params) return channel @@ -622,7 +656,7 @@ def test_login_aud_no_config(self): ) def test_login_no_token(self): - params = json.dumps({"type": "org.matrix.login.jwt"}) + params = {"type": "org.matrix.login.jwt"} channel = self.make_request(b"POST", LOGIN_URL, params) self.assertEqual(channel.result["code"], b"403", channel.result) self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN") @@ -632,10 +666,8 @@ def test_login_no_token(self): # The JWTPubKeyTestCase is a complement to JWTTestCase where we instead use # RSS256, with a public key configured in synapse as "jwt_secret", and tokens # signed by the private key. +@skip_unless(HAS_JWT, "requires jwt") class JWTPubKeyTestCase(unittest.HomeserverTestCase): - if not jwt: - skip = "requires jwt" - servlets = [ login.register_servlets, ] @@ -692,17 +724,15 @@ def make_homeserver(self, reactor, clock): self.hs.config.jwt_algorithm = "RS256" return self.hs - def jwt_encode(self, token: str, secret: str = jwt_privatekey) -> str: + def jwt_encode(self, payload: Dict[str, Any], secret: str = jwt_privatekey) -> str: # PyJWT 2.0.0 changed the return type of jwt.encode from bytes to str. - result = jwt.encode(token, secret, "RS256") + result = jwt.encode(payload, secret, "RS256") # type: Union[bytes,str] if isinstance(result, bytes): return result.decode("ascii") return result def jwt_login(self, *args): - params = json.dumps( - {"type": "org.matrix.login.jwt", "token": self.jwt_encode(*args)} - ) + params = {"type": "org.matrix.login.jwt", "token": self.jwt_encode(*args)} channel = self.make_request(b"POST", LOGIN_URL, params) return channel From 8a910f97a47edb398acba2ad87805e4593c76139 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 6 Jan 2021 16:16:16 +0000 Subject: [PATCH 08/26] Add some tests for the IDP picker flow --- synapse/rest/client/v1/login.py | 4 +- tests/rest/client/v1/test_login.py | 191 ++++++++++++++++++++++++++++- tests/rest/client/v1/utils.py | 3 +- 3 files changed, 193 insertions(+), 5 deletions(-) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index ebc346105b8f..be938df962f4 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -319,9 +319,9 @@ def __init__(self, hs: "HomeServer"): # register themselves with the main SSOHandler. if hs.config.cas_enabled: hs.get_cas_handler() - elif hs.config.saml2_enabled: + if hs.config.saml2_enabled: hs.get_saml_handler() - elif hs.config.oidc_enabled: + if hs.config.oidc_enabled: hs.get_oidc_handler() self._sso_handler = hs.get_sso_handler() diff --git a/tests/rest/client/v1/test_login.py b/tests/rest/client/v1/test_login.py index b5a2ac23d4cd..1d1dc9f8a2fc 100644 --- a/tests/rest/client/v1/test_login.py +++ b/tests/rest/client/v1/test_login.py @@ -15,17 +15,26 @@ import time import urllib.parse -from typing import Any, Dict, Union +from html.parser import HTMLParser +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from mock import Mock +import pymacaroons + +from twisted.web.resource import Resource + import synapse.rest.admin from synapse.appservice import ApplicationService from synapse.rest.client.v1 import login, logout from synapse.rest.client.v2_alpha import devices, register from synapse.rest.client.v2_alpha.account import WhoamiRestServlet +from synapse.rest.synapse.client.pick_idp import PickIdpResource from tests import unittest +from tests.handlers.test_oidc import HAS_OIDC +from tests.handlers.test_saml import has_saml2 +from tests.rest.client.v1.utils import TEST_OIDC_AUTH_ENDPOINT, TEST_OIDC_CONFIG from tests.unittest import override_config, skip_unless try: @@ -350,6 +359,184 @@ def test_session_can_hard_logout_all_sessions_after_being_soft_logged_out(self): self.assertEquals(channel.result["code"], b"200", channel.result) +@skip_unless(has_saml2 and HAS_OIDC, "Requires SAML2 and OIDC") +class MultiSSOTestCase(unittest.HomeserverTestCase): + """Tests for homeservers with multiple SSO providers enabled""" + + servlets = [ + login.register_servlets, + ] + + def default_config(self) -> Dict[str, Any]: + config = super().default_config() + + config["public_baseurl"] = BASE_URL + + config["cas_config"] = { + "enabled": True, + "server_url": CAS_SERVER, + "service_url": "https://matrix.goodserver.com:8448", + } + + config["saml2_config"] = { + "sp_config": { + "metadata": {"inline": [TEST_SAML_METADATA]}, + # use the XMLSecurity backend to avoid relying on xmlsec1 + "crypto_backend": "XMLSecurity", + }, + } + + config["oidc_config"] = TEST_OIDC_CONFIG + + return config + + def create_resource_dict(self) -> Dict[str, Resource]: + d = super().create_resource_dict() + d["/_synapse/client/pick_idp"] = PickIdpResource(self.hs) + return d + + def test_multi_sso_redirect(self): + """/login/sso/redirect should redirect to an identity picker""" + client_redirect_url = "https://x?" + + # first hit the redirect url, which should redirect to our idp picker + channel = self.make_request( + "GET", + "/_matrix/client/r0/login/sso/redirect?redirectUrl=" + client_redirect_url, + ) + self.assertEqual(channel.code, 302, channel.result) + uri = channel.headers.getRawHeaders("Location")[0] + + # hitting that picker should give us some HTML + channel = self.make_request("GET", uri) + self.assertEqual(channel.code, 200, channel.result) + + # parse the form to check it has fields assumed elsewhere in this class + class FormPageParser(HTMLParser): + def __init__(self): + super().__init__() + + # the values of the hidden inputs: map from name to value + self.hiddens = {} # type: Dict[str, Optional[str]] + + # the values of the radio buttons + self.radios = [] # type: List[Optional[str]] + + def handle_starttag( + self, tag: str, attrs: Iterable[Tuple[str, Optional[str]]] + ) -> None: + attr_dict = dict(attrs) + if tag == "input": + if attr_dict["type"] == "radio" and attr_dict["name"] == "idp": + self.radios.append(attr_dict["value"]) + elif attr_dict["type"] == "hidden": + input_name = attr_dict["name"] + assert input_name + self.hiddens[input_name] = attr_dict["value"] + + def error(_, message): + self.fail(message) + + p = FormPageParser() + p.feed(channel.result["body"].decode("utf-8")) + p.close() + + self.assertCountEqual(p.radios, ["cas", "oidc", "saml"]) + + self.assertEqual(p.hiddens["redirectUrl"], client_redirect_url) + + def test_multi_sso_redirect_to_cas(self): + """If CAS is chosen, should redirect to the CAS server""" + client_redirect_url = "https://x?" + + channel = self.make_request( + "GET", + "/_synapse/client/pick_idp?redirectUrl=" + client_redirect_url + "&idp=cas", + shorthand=False, + ) + self.assertEqual(channel.code, 302, channel.result) + cas_uri = channel.headers.getRawHeaders("Location")[0] + cas_uri_path, cas_uri_query = cas_uri.split("?", 1) + + # it should redirect us to the login page of the cas server + self.assertEqual(cas_uri_path, CAS_SERVER + "/login") + + # check that the redirectUrl is correctly encoded in the service param - ie, the + # place that CAS will redirect to + cas_uri_params = urllib.parse.parse_qs(cas_uri_query) + service_uri = cas_uri_params["service"][0] + _, service_uri_query = service_uri.split("?", 1) + service_uri_params = urllib.parse.parse_qs(service_uri_query) + self.assertEqual(service_uri_params["redirectUrl"][0], client_redirect_url) + + def test_multi_sso_redirect_to_saml(self): + """If SAML is chosen, should redirect to the SAML server""" + client_redirect_url = "https://x?" + + channel = self.make_request( + "GET", + "/_synapse/client/pick_idp?redirectUrl=" + + client_redirect_url + + "&idp=saml", + ) + self.assertEqual(channel.code, 302, channel.result) + saml_uri = channel.headers.getRawHeaders("Location")[0] + saml_uri_path, saml_uri_query = saml_uri.split("?", 1) + + # it should redirect us to the login page of the SAML server + self.assertEqual(saml_uri_path, SAML_SERVER) + + # the RelayState is used to carry the client redirect url + saml_uri_params = urllib.parse.parse_qs(saml_uri_query) + relay_state_param = saml_uri_params["RelayState"][0] + self.assertEqual(relay_state_param, client_redirect_url) + + def test_multi_sso_redirect_to_oidc(self): + """If OIDC is chosen, should redirect to the OIDC auth endpoint""" + client_redirect_url = "https://x?" + + channel = self.make_request( + "GET", + "/_synapse/client/pick_idp?redirectUrl=" + + client_redirect_url + + "&idp=oidc", + ) + self.assertEqual(channel.code, 302, channel.result) + oidc_uri = channel.headers.getRawHeaders("Location")[0] + oidc_uri_path, oidc_uri_query = oidc_uri.split("?", 1) + + # it should redirect us to the auth page of the OIDC server + self.assertEqual(oidc_uri_path, TEST_OIDC_AUTH_ENDPOINT) + + # ... and should have set a cookie including the redirect url + cookies = dict( + h.split(";")[0].split("=", maxsplit=1) + for h in channel.headers.getRawHeaders("Set-Cookie") + ) + + oidc_session_cookie = cookies["oidc_session"] + macaroon = pymacaroons.Macaroon.deserialize(oidc_session_cookie) + self.assertEqual( + self._get_value_from_macaroon(macaroon, "client_redirect_url"), + client_redirect_url, + ) + + def test_multi_sso_redirect_to_unknown(self): + """An unknown IdP should cause a 400""" + channel = self.make_request( + "GET", "/_synapse/client/pick_idp?redirectUrl=http://x&idp=xyz", + ) + self.assertEqual(channel.code, 400, channel.result) + + @staticmethod + def _get_value_from_macaroon(macaroon: pymacaroons.Macaroon, key: str) -> str: + prefix = key + " = " + for caveat in macaroon.caveats: + if caveat.caveat_id.startswith(prefix): + return caveat.caveat_id[len(prefix) :] + raise ValueError("No %s caveat in macaroon" % (key,)) + + class CASTestCase(unittest.HomeserverTestCase): servlets = [ @@ -363,7 +550,7 @@ def make_homeserver(self, reactor, clock): config = self.default_config() config["cas_config"] = { "enabled": True, - "server_url": "https://fake.test", + "server_url": CAS_SERVER, "service_url": "https://matrix.goodserver.com:8448", } diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py index dbc27893b538..81b7f8436068 100644 --- a/tests/rest/client/v1/utils.py +++ b/tests/rest/client/v1/utils.py @@ -444,6 +444,7 @@ async def mock_req(method: str, uri: str, data=None, headers=None): # an 'oidc_config' suitable for login_via_oidc. +TEST_OIDC_AUTH_ENDPOINT = "https://issuer.test/auth" TEST_OIDC_CONFIG = { "enabled": True, "discover": False, @@ -451,7 +452,7 @@ async def mock_req(method: str, uri: str, data=None, headers=None): "client_id": "test-client-id", "client_secret": "test-client-secret", "scopes": ["profile"], - "authorization_endpoint": "https://z", + "authorization_endpoint": TEST_OIDC_AUTH_ENDPOINT, "token_endpoint": "https://issuer.test/token", "userinfo_endpoint": "https://issuer.test/userinfo", "user_mapping_provider": {"config": {"localpart_template": "{{ user.sub }}"}}, From a458e2866ee8de7374c57621c20394219ae958ed Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 7 Jan 2021 12:26:26 +0000 Subject: [PATCH 09/26] changelog --- changelog.d/9036.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/9036.feature diff --git a/changelog.d/9036.feature b/changelog.d/9036.feature new file mode 100644 index 000000000000..01a24dcf4900 --- /dev/null +++ b/changelog.d/9036.feature @@ -0,0 +1 @@ +Add support for multiple SSO Identity Providers. From 9066c2fd7f989b261d3e05a966d15ac34b2836c2 Mon Sep 17 00:00:00 2001 From: Emelie Date: Thu, 7 Jan 2021 16:31:01 +0100 Subject: [PATCH 10/26] Fix typo in docs/systemd-with-workers/README.md (#9035) Signed-off-by: Emelie em@nao.sh --- changelog.d/9035.doc | 1 + docs/systemd-with-workers/README.md | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/9035.doc diff --git a/changelog.d/9035.doc b/changelog.d/9035.doc new file mode 100644 index 000000000000..2a7f0db51800 --- /dev/null +++ b/changelog.d/9035.doc @@ -0,0 +1 @@ +Corrected a typo in the `systemd-with-workers` documentation. diff --git a/docs/systemd-with-workers/README.md b/docs/systemd-with-workers/README.md index 8e57d4f62ec6..cfa36be7b4c5 100644 --- a/docs/systemd-with-workers/README.md +++ b/docs/systemd-with-workers/README.md @@ -31,7 +31,7 @@ There is no need for a separate configuration file for the master process. 1. Adjust synapse configuration files as above. 1. Copy the `*.service` and `*.target` files in [system](system) to `/etc/systemd/system`. -1. Run `systemctl deamon-reload` to tell systemd to load the new unit files. +1. Run `systemctl daemon-reload` to tell systemd to load the new unit files. 1. Run `systemctl enable matrix-synapse.service`. This will configure the synapse master process to be started as part of the `matrix-synapse.target` target. From 63593134a15a6b47ced61eeb0670071f89400bad Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Jan 2021 17:20:44 +0000 Subject: [PATCH 11/26] Some cleanups to device inbox store. (#9041) --- changelog.d/9041.misc | 1 + .../replication/slave/storage/deviceinbox.py | 8 -- synapse/storage/databases/main/deviceinbox.py | 107 ++++++++++-------- 3 files changed, 59 insertions(+), 57 deletions(-) create mode 100644 changelog.d/9041.misc diff --git a/changelog.d/9041.misc b/changelog.d/9041.misc new file mode 100644 index 000000000000..4952fbe8a2ae --- /dev/null +++ b/changelog.d/9041.misc @@ -0,0 +1 @@ +Various cleanups to device inbox store. diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 5b045bed02f9..62b68dd6e995 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -18,7 +18,6 @@ from synapse.replication.tcp.streams import ToDeviceStream from synapse.storage.database import DatabasePool from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore -from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -37,13 +36,6 @@ def __init__(self, database: DatabasePool, db_conn, hs): self._device_inbox_id_gen.get_current_token(), ) - self._last_device_delete_cache = ExpiringCache( - cache_name="last_device_delete_cache", - clock=self._clock, - max_len=10000, - expiry_ms=30 * 60 * 1000, - ) - def process_replication_rows(self, stream_name, instance_name, token, rows): if stream_name == ToDeviceStream.NAME: self._device_inbox_id_gen.advance(instance_name, token) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index d42faa3f1f69..eb72c2115572 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -17,7 +17,7 @@ from typing import List, Tuple from synapse.logging.opentracing import log_kv, set_tag, trace -from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause +from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool from synapse.util import json_encoder from synapse.util.caches.expiringcache import ExpiringCache @@ -26,6 +26,18 @@ class DeviceInboxWorkerStore(SQLBaseStore): + def __init__(self, database: DatabasePool, db_conn, hs): + super().__init__(database, db_conn, hs) + + # Map of (user_id, device_id) to the last stream_id that has been + # deleted up to. This is so that we can no op deletions. + self._last_device_delete_cache = ExpiringCache( + cache_name="last_device_delete_cache", + clock=self._clock, + max_len=10000, + expiry_ms=30 * 60 * 1000, + ) + def get_to_device_stream_token(self): return self._device_inbox_id_gen.get_current_token() @@ -310,20 +322,6 @@ def reindex_txn(conn): class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): - DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" - - def __init__(self, database: DatabasePool, db_conn, hs): - super().__init__(database, db_conn, hs) - - # Map of (user_id, device_id) to the last stream_id that has been - # deleted up to. This is so that we can no op deletions. - self._last_device_delete_cache = ExpiringCache( - cache_name="last_device_delete_cache", - clock=self._clock, - max_len=10000, - expiry_ms=30 * 60 * 1000, - ) - @trace async def add_messages_to_device_inbox( self, @@ -351,16 +349,19 @@ def add_messages_txn(txn, now_ms, stream_id): # Add the remote messages to the federation outbox. # We'll send them to a remote server when we next send a # federation transaction to that destination. - sql = ( - "INSERT INTO device_federation_outbox" - " (destination, stream_id, queued_ts, messages_json)" - " VALUES (?,?,?,?)" + self.db_pool.simple_insert_many_txn( + txn, + table="device_federation_outbox", + values=[ + { + "destination": destination, + "stream_id": stream_id, + "queued_ts": now_ms, + "messages_json": json_encoder.encode(edu), + } + for destination, edu in remote_messages_by_destination.items() + ], ) - rows = [] - for destination, edu in remote_messages_by_destination.items(): - edu_json = json_encoder.encode(edu) - rows.append((destination, stream_id, now_ms, edu_json)) - txn.executemany(sql, rows) async with self._device_inbox_id_gen.get_next() as stream_id: now_ms = self.clock.time_msec() @@ -433,32 +434,37 @@ def _add_messages_to_local_device_inbox_txn( devices = list(messages_by_device.keys()) if len(devices) == 1 and devices[0] == "*": # Handle wildcard device_ids. - sql = "SELECT device_id FROM devices WHERE user_id = ?" - txn.execute(sql, (user_id,)) + devices = self.db_pool.simple_select_onecol_txn( + txn, + table="devices", + keyvalues={"user_id": user_id}, + retcol="device_id", + ) + message_json = json_encoder.encode(messages_by_device["*"]) - for row in txn: + for device_id in devices: # Add the message for all devices for this user on this # server. - device = row[0] - messages_json_for_user[device] = message_json + messages_json_for_user[device_id] = message_json else: if not devices: continue - clause, args = make_in_list_sql_clause( - txn.database_engine, "device_id", devices + rows = self.db_pool.simple_select_many_txn( + txn, + table="devices", + keyvalues={"user_id": user_id}, + column="device_id", + iterable=devices, + retcols=("device_id",), ) - sql = "SELECT device_id FROM devices WHERE user_id = ? AND " + clause - # TODO: Maybe this needs to be done in batches if there are - # too many local devices for a given user. - txn.execute(sql, [user_id] + list(args)) - for row in txn: + for row in rows: # Only insert into the local inbox if the device exists on # this server - device = row[0] - message_json = json_encoder.encode(messages_by_device[device]) - messages_json_for_user[device] = message_json + device_id = row["device_id"] + message_json = json_encoder.encode(messages_by_device[device_id]) + messages_json_for_user[device_id] = message_json if messages_json_for_user: local_by_user_then_device[user_id] = messages_json_for_user @@ -466,14 +472,17 @@ def _add_messages_to_local_device_inbox_txn( if not local_by_user_then_device: return - sql = ( - "INSERT INTO device_inbox" - " (user_id, device_id, stream_id, message_json)" - " VALUES (?,?,?,?)" + self.db_pool.simple_insert_many_txn( + txn, + table="device_inbox", + values=[ + { + "user_id": user_id, + "device_id": device_id, + "stream_id": stream_id, + "message_json": message_json, + } + for user_id, messages_by_device in local_by_user_then_device.items() + for device_id, message_json in messages_by_device.items() + ], ) - rows = [] - for user_id, messages_by_device in local_by_user_then_device.items(): - for device_id, message_json in messages_by_device.items(): - rows.append((user_id, device_id, stream_id, message_json)) - - txn.executemany(sql, rows) From e34df813ce96168d6bd0ee6c2888122a127c1773 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Jan 2021 18:06:52 +0000 Subject: [PATCH 12/26] Ensure that remote users' device list resyncing always happens on master (#9043) Currently `DeviceMessageHandler` only ever exists on master, but that is about to change. --- changelog.d/9043.feature | 1 + synapse/handlers/devicemessage.py | 17 +++++++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) create mode 100644 changelog.d/9043.feature diff --git a/changelog.d/9043.feature b/changelog.d/9043.feature new file mode 100644 index 000000000000..4ec319f1f291 --- /dev/null +++ b/changelog.d/9043.feature @@ -0,0 +1 @@ +Add experimental support for handling and persistence of to-device messages to happen on worker processes. diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 9cac5a846396..eb10d2b4bd7f 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -24,6 +24,7 @@ set_tag, start_active_span, ) +from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util import json_encoder from synapse.util.stringutils import random_string @@ -50,7 +51,17 @@ def __init__(self, hs: "HomeServer"): "m.direct_to_device", self.on_direct_to_device_edu ) - self._device_list_updater = hs.get_device_handler().device_list_updater + # The handler to call when we think a user's device list might be out of + # sync. We do all device list resyncing on the master instance, so if + # we're on a worker we hit the device resync replication API. + if hs.config.worker.worker_app is None: + self._user_device_resync = ( + hs.get_device_handler().device_list_updater.user_device_resync + ) + else: + self._user_device_resync = ReplicationUserDevicesResyncRestServlet.make_client( + hs + ) async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None: local_messages = {} @@ -138,9 +149,7 @@ async def _check_for_unknown_devices( await self.store.mark_remote_user_device_cache_as_stale(sender_user_id) # Immediately attempt a resync in the background - run_in_background( - self._device_list_updater.user_device_resync, sender_user_id - ) + run_in_background(self._user_device_resync, sender_user_id) async def send_device_message( self, From 5e99a94502ca9a3865a226097c459b2209df9023 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Jan 2021 18:07:28 +0000 Subject: [PATCH 13/26] Support routing edu's to multiple instances (#9042) This is in preparation for moving `SendToDeviceServlet` off master --- changelog.d/9042.feature | 1 + synapse/federation/federation_server.py | 21 ++++++++++++++++----- 2 files changed, 17 insertions(+), 5 deletions(-) create mode 100644 changelog.d/9042.feature diff --git a/changelog.d/9042.feature b/changelog.d/9042.feature new file mode 100644 index 000000000000..4ec319f1f291 --- /dev/null +++ b/changelog.d/9042.feature @@ -0,0 +1 @@ +Add experimental support for handling and persistence of to-device messages to happen on worker processes. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 35e345ce7077..e5339aca239d 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import random from typing import ( TYPE_CHECKING, Any, @@ -860,8 +861,10 @@ def __init__(self, hs: "HomeServer"): ) # type: Dict[str, Callable[[str, dict], Awaitable[None]]] self.query_handlers = {} # type: Dict[str, Callable[[dict], Awaitable[None]]] - # Map from type to instance name that we should route EDU handling to. - self._edu_type_to_instance = {} # type: Dict[str, str] + # Map from type to instance names that we should route EDU handling to. + # We randomly choose one instance from the list to route to for each new + # EDU received. + self._edu_type_to_instance = {} # type: Dict[str, List[str]] def register_edu_handler( self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]] @@ -905,7 +908,12 @@ def register_query_handler( def register_instance_for_edu(self, edu_type: str, instance_name: str): """Register that the EDU handler is on a different instance than master. """ - self._edu_type_to_instance[edu_type] = instance_name + self._edu_type_to_instance[edu_type] = [instance_name] + + def register_instances_for_edu(self, edu_type: str, instance_names: List[str]): + """Register that the EDU handler is on multiple instances. + """ + self._edu_type_to_instance[edu_type] = instance_names async def on_edu(self, edu_type: str, origin: str, content: dict): if not self.config.use_presence and edu_type == "m.presence": @@ -924,8 +932,11 @@ async def on_edu(self, edu_type: str, origin: str, content: dict): return # Check if we can route it somewhere else that isn't us - route_to = self._edu_type_to_instance.get(edu_type, "master") - if route_to != self._instance_name: + instances = self._edu_type_to_instance.get(edu_type, ["master"]) + if self._instance_name not in instances: + # Pick an instance randomly so that we don't overload one. + route_to = random.choice(instances) + try: await self._send_edu( instance_name=route_to, From b530eaa262b9c8af378f976e5d2628e8c02b10d8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Jan 2021 20:19:26 +0000 Subject: [PATCH 14/26] Allow running sendToDevice on workers (#9044) --- changelog.d/9044.feature | 1 + scripts/synapse_port_db | 27 ++++ synapse/app/generic_worker.py | 3 + synapse/config/workers.py | 10 +- synapse/handlers/devicemessage.py | 31 +++- .../replication/slave/storage/deviceinbox.py | 32 +--- synapse/replication/tcp/handler.py | 9 ++ synapse/storage/databases/main/__init__.py | 33 ---- synapse/storage/databases/main/deviceinbox.py | 147 ++++++++++++++---- .../delta/59/02shard_send_to_device.sql | 18 +++ ...shard_send_to_device_sequence.sql.postgres | 25 +++ 11 files changed, 231 insertions(+), 105 deletions(-) create mode 100644 changelog.d/9044.feature create mode 100644 synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql create mode 100644 synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres diff --git a/changelog.d/9044.feature b/changelog.d/9044.feature new file mode 100644 index 000000000000..4ec319f1f291 --- /dev/null +++ b/changelog.d/9044.feature @@ -0,0 +1 @@ +Add experimental support for handling and persistence of to-device messages to happen on worker processes. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 5ad17aa90f1a..22dd169bfb78 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -629,6 +629,7 @@ class Porter(object): await self._setup_state_group_id_seq() await self._setup_user_id_seq() await self._setup_events_stream_seqs() + await self._setup_device_inbox_seq() # Step 3. Get tables. self.progress.set_state("Fetching tables") @@ -911,6 +912,32 @@ class Porter(object): "_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos, ) + async def _setup_device_inbox_seq(self): + """Set the device inbox sequence to the correct value. + """ + curr_local_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table="device_inbox", + keyvalues={}, + retcol="COALESCE(MAX(stream_id), 1)", + allow_none=True, + ) + + curr_federation_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table="device_federation_outbox", + keyvalues={}, + retcol="COALESCE(MAX(stream_id), 1)", + allow_none=True, + ) + + next_id = max(curr_local_id, curr_federation_id) + 1 + + def r(txn): + txn.execute( + "ALTER SEQUENCE device_inbox_sequence RESTART WITH %s", (next_id,) + ) + + return self.postgres_store.db_pool.runInteraction("_setup_device_inbox_seq", r) + ############################################## # The following is simply UI stuff diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index fa23d9bb20e5..44284727070b 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -108,6 +108,7 @@ ) from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet from synapse.rest.client.v2_alpha.register import RegisterRestServlet +from synapse.rest.client.v2_alpha.sendtodevice import SendToDeviceRestServlet from synapse.rest.client.versions import VersionsRestServlet from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyApiV2Resource @@ -520,6 +521,8 @@ def _listen_http(self, listener_config: ListenerConfig): room.register_deprecated_servlets(self, resource) InitialSyncRestServlet(self).register(resource) + SendToDeviceRestServlet(self).register(resource) + user_directory.register_servlets(self, resource) # If presence is disabled, use the stub servlet that does diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 7ca9efec525a..364583f48b0b 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -53,6 +53,9 @@ class WriterLocations: default=["master"], type=List[str], converter=_instance_to_list_converter ) typing = attr.ib(default="master", type=str) + to_device = attr.ib( + default=["master"], type=List[str], converter=_instance_to_list_converter, + ) class WorkerConfig(Config): @@ -124,7 +127,7 @@ def read_config(self, config, **kwargs): # Check that the configured writers for events and typing also appears in # `instance_map`. - for stream in ("events", "typing"): + for stream in ("events", "typing", "to_device"): instances = _instance_to_list_converter(getattr(self.writers, stream)) for instance in instances: if instance != "master" and instance not in self.instance_map: @@ -133,6 +136,11 @@ def read_config(self, config, **kwargs): % (instance, stream) ) + if len(self.writers.to_device) != 1: + raise ConfigError( + "Must only specify one instance to handle `to_device` messages." + ) + self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events) # Whether this worker should run background tasks or not. diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index eb10d2b4bd7f..fc974a82e860 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -45,11 +45,25 @@ def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() self.notifier = hs.get_notifier() self.is_mine = hs.is_mine - self.federation = hs.get_federation_sender() - hs.get_federation_registry().register_edu_handler( - "m.direct_to_device", self.on_direct_to_device_edu - ) + # We only need to poke the federation sender explicitly if its on the + # same instance. Other federation sender instances will get notified by + # `synapse.app.generic_worker.FederationSenderHandler` when it sees it + # in the to-device replication stream. + self.federation_sender = None + if hs.should_send_federation(): + self.federation_sender = hs.get_federation_sender() + + # If we can handle the to device EDUs we do so, otherwise we route them + # to the appropriate worker. + if hs.get_instance_name() in hs.config.worker.writers.to_device: + hs.get_federation_registry().register_edu_handler( + "m.direct_to_device", self.on_direct_to_device_edu + ) + else: + hs.get_federation_registry().register_instances_for_edu( + "m.direct_to_device", hs.config.worker.writers.to_device, + ) # The handler to call when we think a user's device list might be out of # sync. We do all device list resyncing on the master instance, so if @@ -204,7 +218,8 @@ async def send_device_message( ) log_kv({"remote_messages": remote_messages}) - for destination in remote_messages.keys(): - # Enqueue a new federation transaction to send the new - # device messages to each remote destination. - self.federation.send_device_messages(destination) + if self.federation_sender: + for destination in remote_messages.keys(): + # Enqueue a new federation transaction to send the new + # device messages to each remote destination. + self.federation_sender.send_device_messages(destination) diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 62b68dd6e995..1260f6d1412f 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -14,38 +14,8 @@ # limitations under the License. from synapse.replication.slave.storage._base import BaseSlavedStore -from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker -from synapse.replication.tcp.streams import ToDeviceStream -from synapse.storage.database import DatabasePool from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore -from synapse.util.caches.stream_change_cache import StreamChangeCache class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore): - def __init__(self, database: DatabasePool, db_conn, hs): - super().__init__(database, db_conn, hs) - self._device_inbox_id_gen = SlavedIdTracker( - db_conn, "device_inbox", "stream_id" - ) - self._device_inbox_stream_cache = StreamChangeCache( - "DeviceInboxStreamChangeCache", - self._device_inbox_id_gen.get_current_token(), - ) - self._device_federation_outbox_stream_cache = StreamChangeCache( - "DeviceFederationOutboxStreamChangeCache", - self._device_inbox_id_gen.get_current_token(), - ) - - def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == ToDeviceStream.NAME: - self._device_inbox_id_gen.advance(instance_name, token) - for row in rows: - if row.entity.startswith("@"): - self._device_inbox_stream_cache.entity_has_changed( - row.entity, token - ) - else: - self._device_federation_outbox_stream_cache.entity_has_changed( - row.entity, token - ) - return super().process_replication_rows(stream_name, instance_name, token, rows) + pass diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 95e5502bf2b2..1f89249475e7 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -56,6 +56,7 @@ EventsStream, FederationStream, Stream, + ToDeviceStream, TypingStream, ) @@ -115,6 +116,14 @@ def __init__(self, hs): continue + if isinstance(stream, ToDeviceStream): + # Only add ToDeviceStream as a source on instances in charge of + # sending to device messages. + if hs.get_instance_name() in hs.config.worker.writers.to_device: + self._streams_to_replicate.append(stream) + + continue + if isinstance(stream, TypingStream): # Only add TypingStream as a source on the instance in charge of # typing. diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 701748f93bf0..c4de07a0a8a1 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -127,9 +127,6 @@ def __init__(self, database: DatabasePool, db_conn, hs): self._presence_id_gen = StreamIdGenerator( db_conn, "presence_stream", "stream_id" ) - self._device_inbox_id_gen = StreamIdGenerator( - db_conn, "device_inbox", "stream_id" - ) self._public_room_id_gen = StreamIdGenerator( db_conn, "public_room_list_stream", "stream_id" ) @@ -189,36 +186,6 @@ def __init__(self, database: DatabasePool, db_conn, hs): prefilled_cache=presence_cache_prefill, ) - max_device_inbox_id = self._device_inbox_id_gen.get_current_token() - device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict( - db_conn, - "device_inbox", - entity_column="user_id", - stream_column="stream_id", - max_value=max_device_inbox_id, - limit=1000, - ) - self._device_inbox_stream_cache = StreamChangeCache( - "DeviceInboxStreamChangeCache", - min_device_inbox_id, - prefilled_cache=device_inbox_prefill, - ) - # The federation outbox and the local device inbox uses the same - # stream_id generator. - device_outbox_prefill, min_device_outbox_id = self.db_pool.get_cache_dict( - db_conn, - "device_federation_outbox", - entity_column="destination", - stream_column="stream_id", - max_value=max_device_inbox_id, - limit=1000, - ) - self._device_federation_outbox_stream_cache = StreamChangeCache( - "DeviceFederationOutboxStreamChangeCache", - min_device_outbox_id, - prefilled_cache=device_outbox_prefill, - ) - device_list_max = self._device_list_id_gen.get_current_token() self._device_list_stream_cache = StreamChangeCache( "DeviceListStreamChangeCache", device_list_max diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index eb72c2115572..58d3f71e45ca 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -17,10 +17,14 @@ from typing import List, Tuple from synapse.logging.opentracing import log_kv, set_tag, trace +from synapse.replication.tcp.streams import ToDeviceStream from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool +from synapse.storage.engines import PostgresEngine +from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.util import json_encoder from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.caches.stream_change_cache import StreamChangeCache logger = logging.getLogger(__name__) @@ -29,6 +33,8 @@ class DeviceInboxWorkerStore(SQLBaseStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) + self._instance_name = hs.get_instance_name() + # Map of (user_id, device_id) to the last stream_id that has been # deleted up to. This is so that we can no op deletions. self._last_device_delete_cache = ExpiringCache( @@ -38,6 +44,73 @@ def __init__(self, database: DatabasePool, db_conn, hs): expiry_ms=30 * 60 * 1000, ) + if isinstance(database.engine, PostgresEngine): + self._can_write_to_device = ( + self._instance_name in hs.config.worker.writers.to_device + ) + + self._device_inbox_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + stream_name="to_device", + instance_name=self._instance_name, + table="device_inbox", + instance_column="instance_name", + id_column="stream_id", + sequence_name="device_inbox_sequence", + writers=hs.config.worker.writers.to_device, + ) + else: + self._can_write_to_device = True + self._device_inbox_id_gen = StreamIdGenerator( + db_conn, "device_inbox", "stream_id" + ) + + max_device_inbox_id = self._device_inbox_id_gen.get_current_token() + device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict( + db_conn, + "device_inbox", + entity_column="user_id", + stream_column="stream_id", + max_value=max_device_inbox_id, + limit=1000, + ) + self._device_inbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", + min_device_inbox_id, + prefilled_cache=device_inbox_prefill, + ) + + # The federation outbox and the local device inbox uses the same + # stream_id generator. + device_outbox_prefill, min_device_outbox_id = self.db_pool.get_cache_dict( + db_conn, + "device_federation_outbox", + entity_column="destination", + stream_column="stream_id", + max_value=max_device_inbox_id, + limit=1000, + ) + self._device_federation_outbox_stream_cache = StreamChangeCache( + "DeviceFederationOutboxStreamChangeCache", + min_device_outbox_id, + prefilled_cache=device_outbox_prefill, + ) + + def process_replication_rows(self, stream_name, instance_name, token, rows): + if stream_name == ToDeviceStream.NAME: + self._device_inbox_id_gen.advance(instance_name, token) + for row in rows: + if row.entity.startswith("@"): + self._device_inbox_stream_cache.entity_has_changed( + row.entity, token + ) + else: + self._device_federation_outbox_stream_cache.entity_has_changed( + row.entity, token + ) + return super().process_replication_rows(stream_name, instance_name, token, rows) + def get_to_device_stream_token(self): return self._device_inbox_id_gen.get_current_token() @@ -290,38 +363,6 @@ def get_all_new_device_messages_txn(txn): "get_all_new_device_messages", get_all_new_device_messages_txn ) - -class DeviceInboxBackgroundUpdateStore(SQLBaseStore): - DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" - - def __init__(self, database: DatabasePool, db_conn, hs): - super().__init__(database, db_conn, hs) - - self.db_pool.updates.register_background_index_update( - "device_inbox_stream_index", - index_name="device_inbox_stream_id_user_id", - table="device_inbox", - columns=["stream_id", "user_id"], - ) - - self.db_pool.updates.register_background_update_handler( - self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox - ) - - async def _background_drop_index_device_inbox(self, progress, batch_size): - def reindex_txn(conn): - txn = conn.cursor() - txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id") - txn.close() - - await self.db_pool.runWithConnection(reindex_txn) - - await self.db_pool.updates._end_background_update(self.DEVICE_INBOX_STREAM_ID) - - return 1 - - -class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): @trace async def add_messages_to_device_inbox( self, @@ -340,6 +381,8 @@ async def add_messages_to_device_inbox( The new stream_id. """ + assert self._can_write_to_device + def add_messages_txn(txn, now_ms, stream_id): # Add the local messages directly to the local inbox. self._add_messages_to_local_device_inbox_txn( @@ -358,6 +401,7 @@ def add_messages_txn(txn, now_ms, stream_id): "stream_id": stream_id, "queued_ts": now_ms, "messages_json": json_encoder.encode(edu), + "instance_name": self._instance_name, } for destination, edu in remote_messages_by_destination.items() ], @@ -380,6 +424,8 @@ def add_messages_txn(txn, now_ms, stream_id): async def add_messages_from_remote_to_device_inbox( self, origin: str, message_id: str, local_messages_by_user_then_device: dict ) -> int: + assert self._can_write_to_device + def add_messages_txn(txn, now_ms, stream_id): # Check if we've already inserted a matching message_id for that # origin. This can happen if the origin doesn't receive our @@ -428,6 +474,8 @@ def add_messages_txn(txn, now_ms, stream_id): def _add_messages_to_local_device_inbox_txn( self, txn, stream_id, messages_by_user_then_device ): + assert self._can_write_to_device + local_by_user_then_device = {} for user_id, messages_by_device in messages_by_user_then_device.items(): messages_json_for_user = {} @@ -481,8 +529,43 @@ def _add_messages_to_local_device_inbox_txn( "device_id": device_id, "stream_id": stream_id, "message_json": message_json, + "instance_name": self._instance_name, } for user_id, messages_by_device in local_by_user_then_device.items() for device_id, message_json in messages_by_device.items() ], ) + + +class DeviceInboxBackgroundUpdateStore(SQLBaseStore): + DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" + + def __init__(self, database: DatabasePool, db_conn, hs): + super().__init__(database, db_conn, hs) + + self.db_pool.updates.register_background_index_update( + "device_inbox_stream_index", + index_name="device_inbox_stream_id_user_id", + table="device_inbox", + columns=["stream_id", "user_id"], + ) + + self.db_pool.updates.register_background_update_handler( + self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox + ) + + async def _background_drop_index_device_inbox(self, progress, batch_size): + def reindex_txn(conn): + txn = conn.cursor() + txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id") + txn.close() + + await self.db_pool.runWithConnection(reindex_txn) + + await self.db_pool.updates._end_background_update(self.DEVICE_INBOX_STREAM_ID) + + return 1 + + +class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): + pass diff --git a/synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql b/synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql new file mode 100644 index 000000000000..d781a92fecd0 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE device_inbox ADD COLUMN instance_name TEXT; +ALTER TABLE device_federation_inbox ADD COLUMN instance_name TEXT; +ALTER TABLE device_federation_outbox ADD COLUMN instance_name TEXT; diff --git a/synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres b/synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres new file mode 100644 index 000000000000..45a845a3a5fe --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres @@ -0,0 +1,25 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE SEQUENCE IF NOT EXISTS device_inbox_sequence; + +-- We need to take the max across both device_inbox and device_federation_outbox +-- tables as they share the ID generator +SELECT setval('device_inbox_sequence', ( + SELECT GREATEST( + (SELECT COALESCE(MAX(stream_id), 1) FROM device_inbox), + (SELECT COALESCE(MAX(stream_id), 1) FROM device_federation_outbox) + ) +)); From 23a59d24ae34493c2e54e1601b2d3757df35095e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 8 Jan 2021 14:08:44 +0000 Subject: [PATCH 15/26] Run the linters on a consistent list of files (#9038) We were running some linters on some files and some on others. Extract a common setting and use it everywhere. --- .buildkite/scripts/create_postgres_db.py | 1 + changelog.d/9038.misc | 1 + stubs/frozendict.pyi | 11 +---------- stubs/sortedcontainers/sorteddict.pyi | 6 +++--- stubs/txredisapi.pyi | 2 +- tox.ini | 20 +++++++++++++++++--- 6 files changed, 24 insertions(+), 17 deletions(-) create mode 100644 changelog.d/9038.misc diff --git a/.buildkite/scripts/create_postgres_db.py b/.buildkite/scripts/create_postgres_db.py index df6082b0acb0..956339de5cf1 100755 --- a/.buildkite/scripts/create_postgres_db.py +++ b/.buildkite/scripts/create_postgres_db.py @@ -15,6 +15,7 @@ # limitations under the License. import logging + from synapse.storage.engines import create_engine logger = logging.getLogger("create_postgres_db") diff --git a/changelog.d/9038.misc b/changelog.d/9038.misc new file mode 100644 index 000000000000..5b9e21a1dbbb --- /dev/null +++ b/changelog.d/9038.misc @@ -0,0 +1 @@ +Configure the linters to run on a consistent set of files. diff --git a/stubs/frozendict.pyi b/stubs/frozendict.pyi index 3f3af59f2631..0368ba47038b 100644 --- a/stubs/frozendict.pyi +++ b/stubs/frozendict.pyi @@ -15,16 +15,7 @@ # Stub for frozendict. -from typing import ( - Any, - Hashable, - Iterable, - Iterator, - Mapping, - overload, - Tuple, - TypeVar, -) +from typing import Any, Hashable, Iterable, Iterator, Mapping, Tuple, TypeVar, overload _KT = TypeVar("_KT", bound=Hashable) # Key type. _VT = TypeVar("_VT") # Value type. diff --git a/stubs/sortedcontainers/sorteddict.pyi b/stubs/sortedcontainers/sorteddict.pyi index 68779f968ed6..7b9fd079d9b1 100644 --- a/stubs/sortedcontainers/sorteddict.pyi +++ b/stubs/sortedcontainers/sorteddict.pyi @@ -7,17 +7,17 @@ from typing import ( Callable, Dict, Hashable, - Iterator, - Iterable, ItemsView, + Iterable, + Iterator, KeysView, List, Mapping, Optional, Sequence, + Tuple, Type, TypeVar, - Tuple, Union, ValuesView, overload, diff --git a/stubs/txredisapi.pyi b/stubs/txredisapi.pyi index 522244bb57f7..bfac6840e6f2 100644 --- a/stubs/txredisapi.pyi +++ b/stubs/txredisapi.pyi @@ -16,7 +16,7 @@ """Contains *incomplete* type hints for txredisapi. """ -from typing import List, Optional, Union, Type +from typing import List, Optional, Type, Union class RedisProtocol: def publish(self, channel: str, message: bytes): ... diff --git a/tox.ini b/tox.ini index ab4ae295a96f..297136fcc551 100644 --- a/tox.ini +++ b/tox.ini @@ -24,6 +24,20 @@ deps = # install the "enum34" dependency of cryptography. pip>=10 +# directories/files we run the linters on +lint_targets = + setup.py + synapse + tests + scripts + scripts-dev + stubs + contrib + synctl + synmark + .buildkite + docker + # default settings for all tox environments [testenv] deps = @@ -130,13 +144,13 @@ commands = [testenv:check_codestyle] extras = lint commands = - python -m black --check --diff . - /bin/sh -c "flake8 synapse tests scripts scripts-dev contrib synctl {env:PEP8SUFFIX:}" + python -m black --check --diff {[base]lint_targets} + flake8 {[base]lint_targets} {env:PEP8SUFFIX:} {toxinidir}/scripts-dev/config-lint.sh [testenv:check_isort] extras = lint -commands = /bin/sh -c "isort -c --df --sp setup.cfg synapse tests scripts-dev scripts" +commands = isort -c --df --sp setup.cfg {[base]lint_targets} [testenv:check-newsfragment] skip_install = True From 195adf40250553b5ae6a1bd79aec57788c6977b3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 8 Jan 2021 14:09:06 +0000 Subject: [PATCH 16/26] Remove broken and unmaintained 'webserver.py' script (#9039) I'm not even sure what this was supposed to do, but the fact it has python2isms and nobody has noticed suggests it's not terribly important. It doesn't seem to have been used since ff23e5ba3764506c99d9c1c640e202fe262b65ce. --- changelog.d/9039.removal | 1 + demo/webserver.py | 59 ---------------------------------------- 2 files changed, 1 insertion(+), 59 deletions(-) create mode 100644 changelog.d/9039.removal delete mode 100644 demo/webserver.py diff --git a/changelog.d/9039.removal b/changelog.d/9039.removal new file mode 100644 index 000000000000..fb99283ed81f --- /dev/null +++ b/changelog.d/9039.removal @@ -0,0 +1 @@ +Remove broken and unmaintained `demo/webserver.py` script. diff --git a/demo/webserver.py b/demo/webserver.py deleted file mode 100644 index ba176d3bd2af..000000000000 --- a/demo/webserver.py +++ /dev/null @@ -1,59 +0,0 @@ -import argparse -import BaseHTTPServer -import os -import SimpleHTTPServer -import cgi, logging - -from daemonize import Daemonize - - -class SimpleHTTPRequestHandlerWithPOST(SimpleHTTPServer.SimpleHTTPRequestHandler): - UPLOAD_PATH = "upload" - - """ - Accept all post request as file upload - """ - - def do_POST(self): - - path = os.path.join(self.UPLOAD_PATH, os.path.basename(self.path)) - length = self.headers["content-length"] - data = self.rfile.read(int(length)) - - with open(path, "wb") as fh: - fh.write(data) - - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.end_headers() - - # Return the absolute path of the uploaded file - self.wfile.write('{"url":"/%s"}' % path) - - -def setup(): - parser = argparse.ArgumentParser() - parser.add_argument("directory") - parser.add_argument("-p", "--port", dest="port", type=int, default=8080) - parser.add_argument("-P", "--pid-file", dest="pid", default="web.pid") - args = parser.parse_args() - - # Get absolute path to directory to serve, as daemonize changes to '/' - os.chdir(args.directory) - dr = os.getcwd() - - httpd = BaseHTTPServer.HTTPServer(("", args.port), SimpleHTTPRequestHandlerWithPOST) - - def run(): - os.chdir(dr) - httpd.serve_forever() - - daemon = Daemonize( - app="synapse-webclient", pid=args.pid, action=run, auto_close_fds=False - ) - - daemon.start() - - -if __name__ == "__main__": - setup() From fa5f5cbc7453cf87a25fec59e98ad3d0bed3b891 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 Jan 2021 14:15:20 +0000 Subject: [PATCH 17/26] Fix error handling during insertion of client IPs (#9051) You can't continue using a transaction once an exception has been raised, so catching and dropping the error here is pointless and just causes more errors. --- changelog.d/9051.bugfix | 1 + synapse/storage/databases/main/client_ips.py | 54 +++++++++----------- 2 files changed, 24 insertions(+), 31 deletions(-) create mode 100644 changelog.d/9051.bugfix diff --git a/changelog.d/9051.bugfix b/changelog.d/9051.bugfix new file mode 100644 index 000000000000..272be9d7a35f --- /dev/null +++ b/changelog.d/9051.bugfix @@ -0,0 +1 @@ +Fix error handling during insertion of client IPs into the database. diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index e96a8b3f433b..c53c8363376c 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -470,43 +470,35 @@ def _update_client_ips_batch_txn(self, txn, to_update): for entry in to_update.items(): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry - try: - self.db_pool.simple_upsert_txn( + self.db_pool.simple_upsert_txn( + txn, + table="user_ips", + keyvalues={"user_id": user_id, "access_token": access_token, "ip": ip}, + values={ + "user_agent": user_agent, + "device_id": device_id, + "last_seen": last_seen, + }, + lock=False, + ) + + # Technically an access token might not be associated with + # a device so we need to check. + if device_id: + # this is always an update rather than an upsert: the row should + # already exist, and if it doesn't, that may be because it has been + # deleted, and we don't want to re-create it. + self.db_pool.simple_update_txn( txn, - table="user_ips", - keyvalues={ - "user_id": user_id, - "access_token": access_token, - "ip": ip, - }, - values={ + table="devices", + keyvalues={"user_id": user_id, "device_id": device_id}, + updatevalues={ "user_agent": user_agent, - "device_id": device_id, "last_seen": last_seen, + "ip": ip, }, - lock=False, ) - # Technically an access token might not be associated with - # a device so we need to check. - if device_id: - # this is always an update rather than an upsert: the row should - # already exist, and if it doesn't, that may be because it has been - # deleted, and we don't want to re-create it. - self.db_pool.simple_update_txn( - txn, - table="devices", - keyvalues={"user_id": user_id, "device_id": device_id}, - updatevalues={ - "user_agent": user_agent, - "last_seen": last_seen, - "ip": ip, - }, - ) - except Exception as e: - # Failed to upsert, log and continue - logger.error("Failed to insert client IP %r: %r", entry, e) - async def get_last_client_ip_by_device( self, user_id: str, device_id: Optional[str] ) -> Dict[Tuple[str, str], dict]: From d32870ffa5a2353d93e5723787d5f4dcbf14b32d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 8 Jan 2021 14:23:04 +0000 Subject: [PATCH 18/26] Fix validate_config on nested objects (#9054) --- changelog.d/9054.bugfix | 1 + synapse/config/_util.py | 2 +- tests/config/test_util.py | 53 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 changelog.d/9054.bugfix create mode 100644 tests/config/test_util.py diff --git a/changelog.d/9054.bugfix b/changelog.d/9054.bugfix new file mode 100644 index 000000000000..0bfe951f173f --- /dev/null +++ b/changelog.d/9054.bugfix @@ -0,0 +1 @@ +Fix a minor bug which could cause confusing error messages from invalid configurations. diff --git a/synapse/config/_util.py b/synapse/config/_util.py index 1bbe83c317dd..8fce7f6bb133 100644 --- a/synapse/config/_util.py +++ b/synapse/config/_util.py @@ -56,7 +56,7 @@ def json_error_to_config_error( """ # copy `config_path` before modifying it. path = list(config_path) - for p in list(e.path): + for p in list(e.absolute_path): if isinstance(p, int): path.append("" % p) else: diff --git a/tests/config/test_util.py b/tests/config/test_util.py new file mode 100644 index 000000000000..10363e3765d9 --- /dev/null +++ b/tests/config/test_util.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.config import ConfigError +from synapse.config._util import validate_config + +from tests.unittest import TestCase + + +class ValidateConfigTestCase(TestCase): + """Test cases for synapse.config._util.validate_config""" + + def test_bad_object_in_array(self): + """malformed objects within an array should be validated correctly""" + + # consider a structure: + # + # array_of_objs: + # - r: 1 + # foo: 2 + # + # - r: 2 + # bar: 3 + # + # ... where each entry must contain an "r": check that the path + # to the required item is correclty reported. + + schema = { + "type": "object", + "properties": { + "array_of_objs": { + "type": "array", + "items": {"type": "object", "required": ["r"]}, + }, + }, + } + + with self.assertRaises(ConfigError) as c: + validate_config(schema, {"array_of_objs": [{}]}, ("base",)) + + self.assertEqual(c.exception.path, ["base", "array_of_objs", ""]) From a03d71dc9d60251b8b753cc223b704a4095231da Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 Jan 2021 14:33:53 +0000 Subject: [PATCH 19/26] Fix "Starting metrics collection from sentinel context" errors (#9053) --- changelog.d/9053.bugfix | 1 + synapse/notifier.py | 39 +++++++++++++++++++-------------------- synapse/util/metrics.py | 3 ++- 3 files changed, 22 insertions(+), 21 deletions(-) create mode 100644 changelog.d/9053.bugfix diff --git a/changelog.d/9053.bugfix b/changelog.d/9053.bugfix new file mode 100644 index 000000000000..3d8bbf11a1f4 --- /dev/null +++ b/changelog.d/9053.bugfix @@ -0,0 +1 @@ +Fix bug where we didn't correctly record CPU time spent in 'on_new_event' block. diff --git a/synapse/notifier.py b/synapse/notifier.py index c4c8bb271d90..0745899b480b 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -396,31 +396,30 @@ def on_new_event( Will wake up all listeners for the given users and rooms. """ - with PreserveLoggingContext(): - with Measure(self.clock, "on_new_event"): - user_streams = set() + with Measure(self.clock, "on_new_event"): + user_streams = set() - for user in users: - user_stream = self.user_to_user_stream.get(str(user)) - if user_stream is not None: - user_streams.add(user_stream) + for user in users: + user_stream = self.user_to_user_stream.get(str(user)) + if user_stream is not None: + user_streams.add(user_stream) - for room in rooms: - user_streams |= self.room_to_user_streams.get(room, set()) + for room in rooms: + user_streams |= self.room_to_user_streams.get(room, set()) - time_now_ms = self.clock.time_msec() - for user_stream in user_streams: - try: - user_stream.notify(stream_key, new_token, time_now_ms) - except Exception: - logger.exception("Failed to notify listener") + time_now_ms = self.clock.time_msec() + for user_stream in user_streams: + try: + user_stream.notify(stream_key, new_token, time_now_ms) + except Exception: + logger.exception("Failed to notify listener") - self.notify_replication() + self.notify_replication() - # Notify appservices - self._notify_app_services_ephemeral( - stream_key, new_token, users, - ) + # Notify appservices + self._notify_app_services_ephemeral( + stream_key, new_token, users, + ) def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happened diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 24123d5cc4d2..f4de6b9f5407 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -111,7 +111,8 @@ def __init__(self, clock, name): curr_context = current_context() if not curr_context: logger.warning( - "Starting metrics collection from sentinel context: metrics will be lost" + "Starting metrics collection %r from sentinel context: metrics will be lost", + name, ) parent_context = None else: From bce0c91d9a89097c94d687aadfed9b4ebbdcc75d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christopher=20R=C3=BCcker?= <77160940+chris-ruecker@users.noreply.github.com> Date: Fri, 8 Jan 2021 19:29:30 +0100 Subject: [PATCH 20/26] Keycloak mapping_provider example (#9037) (#9057) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR adds the missing user_mapping_provider section in oidc.md Signed-off-by: Christopher Rücker chris-ruecker@protonmail.com --- changelog.d/9057.doc | 1 + docs/openid.md | 4 ++++ 2 files changed, 5 insertions(+) create mode 100644 changelog.d/9057.doc diff --git a/changelog.d/9057.doc b/changelog.d/9057.doc new file mode 100644 index 000000000000..d16686e7dcdb --- /dev/null +++ b/changelog.d/9057.doc @@ -0,0 +1 @@ +Add missing user_mapping_provider configuration to the Keycloak OIDC example. Contributed by @chris-ruecker. diff --git a/docs/openid.md b/docs/openid.md index da391f74aa0e..ffa4238fff50 100644 --- a/docs/openid.md +++ b/docs/openid.md @@ -158,6 +158,10 @@ oidc_config: client_id: "synapse" client_secret: "copy secret generated from above" scopes: ["openid", "profile"] + user_mapping_provider: + config: + localpart_template: "{{ user.preferred_username }}" + display_name_template: "{{ user.name }}" ``` ### [Auth0][auth0] From ef0388a64802d6c05bd9dcc07072149587718e9c Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 10 Jan 2021 23:40:12 +0000 Subject: [PATCH 21/26] fix spurious MD in README.rst --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 31ae5cc578d5..9ff375708ba3 100644 --- a/README.rst +++ b/README.rst @@ -243,7 +243,7 @@ Then update the ``users`` table in the database:: Synapse Development =================== -Join our developer community on Matrix: [#synapse-dev:matrix.org](https://matrix.to/#/#synapse-dev:matrix.org) +Join our developer community on Matrix: `#synapse-dev:matrix.org `_ Before setting up a development environment for synapse, make sure you have the system dependencies (such as the python header files) installed - see From c21d8f1c1d293767dd9a10464d46b5a908ab6b22 Mon Sep 17 00:00:00 2001 From: Jerin J Titus <72017981+jerinjtitus@users.noreply.github.com> Date: Mon, 11 Jan 2021 15:53:49 +0530 Subject: [PATCH 22/26] Drop last_used column from access_tokens (#9025) * Dropped last_used column from access_tokens Signed-off-by: Jerin J Titus <72017981+jerinjtitus@users.noreply.github.com> --- changelog.d/9025.misc | 1 + .../58/28drop_last_used_column.sql.postgres | 16 +++++ .../58/28drop_last_used_column.sql.sqlite | 62 +++++++++++++++++++ 3 files changed, 79 insertions(+) create mode 100644 changelog.d/9025.misc create mode 100644 synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.postgres create mode 100644 synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.sqlite diff --git a/changelog.d/9025.misc b/changelog.d/9025.misc new file mode 100644 index 000000000000..658f50d8534a --- /dev/null +++ b/changelog.d/9025.misc @@ -0,0 +1 @@ +Removed an unused column from `access_tokens` table. diff --git a/synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.postgres b/synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.postgres new file mode 100644 index 000000000000..de5764501975 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.postgres @@ -0,0 +1,16 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE access_tokens DROP COLUMN last_used; \ No newline at end of file diff --git a/synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.sqlite b/synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.sqlite new file mode 100644 index 000000000000..ee0e3521bfb2 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.sqlite @@ -0,0 +1,62 @@ +/* + * Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + -- Dropping last_used column from access_tokens table. + +CREATE TABLE access_tokens2 ( + id BIGINT PRIMARY KEY, + user_id TEXT NOT NULL, + device_id TEXT, + token TEXT NOT NULL, + valid_until_ms BIGINT, + puppets_user_id TEXT, + last_validated BIGINT, + UNIQUE(token) +); + +INSERT INTO access_tokens2(id, user_id, device_id, token) + SELECT id, user_id, device_id, token FROM access_tokens; + +DROP TABLE access_tokens; +ALTER TABLE access_tokens2 RENAME TO access_tokens; + +CREATE INDEX access_tokens_device_id ON access_tokens (user_id, device_id); + + +-- Re-adding foreign key reference in event_txn_id table + +CREATE TABLE event_txn_id2 ( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + user_id TEXT NOT NULL, + token_id BIGINT NOT NULL, + txn_id TEXT NOT NULL, + inserted_ts BIGINT NOT NULL, + FOREIGN KEY (event_id) + REFERENCES events (event_id) ON DELETE CASCADE, + FOREIGN KEY (token_id) + REFERENCES access_tokens (id) ON DELETE CASCADE +); + +INSERT INTO event_txn_id2(event_id, room_id, user_id, token_id, txn_id, inserted_ts) + SELECT event_id, room_id, user_id, token_id, txn_id, inserted_ts FROM event_txn_id; + +DROP TABLE event_txn_id; +ALTER TABLE event_txn_id2 RENAME TO event_txn_id; + +CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_event_id ON event_txn_id(event_id); +CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_txn_id ON event_txn_id(room_id, user_id, token_id, txn_id); +CREATE INDEX IF NOT EXISTS event_txn_id_ts ON event_txn_id(inserted_ts); \ No newline at end of file From 7db2622d30466700909e03f6e2d4fd12b6af0611 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 11 Jan 2021 10:24:22 +0000 Subject: [PATCH 23/26] Remove unused SynapseService (#9058) --- changelog.d/9058.misc | 1 + synapse/app/homeserver.py | 22 ---------------------- 2 files changed, 1 insertion(+), 22 deletions(-) create mode 100644 changelog.d/9058.misc diff --git a/changelog.d/9058.misc b/changelog.d/9058.misc new file mode 100644 index 000000000000..9df6796e22bd --- /dev/null +++ b/changelog.d/9058.misc @@ -0,0 +1 @@ +Remove unused `SynapseService` class. diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index b1d9817a6a44..42b5dc53d775 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -15,13 +15,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import gc import logging import os import sys from typing import Iterable, Iterator -from twisted.application import service from twisted.internet import defer, reactor from twisted.python.failure import Failure from twisted.web.resource import EncodingResourceWrapper, IResource @@ -73,7 +71,6 @@ from synapse.util.httpresourcetree import create_resource_tree from synapse.util.manhole import manhole from synapse.util.module_loader import load_module -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string logger = logging.getLogger("synapse.app.homeserver") @@ -487,25 +484,6 @@ def format_config_error(e: ConfigError) -> Iterator[str]: e = e.__cause__ -class SynapseService(service.Service): - """ - A twisted Service class that will start synapse. Used to run synapse - via twistd and a .tac. - """ - - def __init__(self, config): - self.config = config - - def startService(self): - hs = setup(self.config) - change_resource_limit(hs.config.soft_file_limit) - if hs.config.gc_thresholds: - gc.set_threshold(*hs.config.gc_thresholds) - - def stopService(self): - return self._port.stopListening() - - def run(hs): PROFILE_SYNAPSE = False if PROFILE_SYNAPSE: From 2fb1c2b6e619b5aec7aeb45369940721ff118c76 Mon Sep 17 00:00:00 2001 From: 0xflotus <0xflotus@gmail.com> Date: Mon, 11 Jan 2021 13:42:18 +0100 Subject: [PATCH 24/26] Fix a typo in the install docs. (#9040) --- INSTALL.md | 2 +- changelog.d/9040.doc | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/9040.doc diff --git a/INSTALL.md b/INSTALL.md index 598ddceb8cdd..656833637c36 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -257,7 +257,7 @@ for a number of platforms. #### Docker images and Ansible playbooks -There is an offical synapse image available at +There is an official synapse image available at which can be used with the docker-compose file available at [contrib/docker](contrib/docker). Further information on this including configuration options is available in the README diff --git a/changelog.d/9040.doc b/changelog.d/9040.doc new file mode 100644 index 000000000000..5c1f7be781f8 --- /dev/null +++ b/changelog.d/9040.doc @@ -0,0 +1 @@ +Corrected a typo in `INSTALL.md`. From 63f4990298ce6369c540fe8d8d8895b20b288317 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 11 Jan 2021 13:57:33 +0000 Subject: [PATCH 25/26] Ensure rejected events get added to some metadata tables (#9016) Co-authored-by: Patrick Cloke --- changelog.d/9016.misc | 1 + synapse/storage/databases/main/events.py | 49 +++---- .../databases/main/events_bg_updates.py | 124 ++++++++++++++++++ .../delta/58/28rejected_events_metadata.sql | 17 +++ 4 files changed, 167 insertions(+), 24 deletions(-) create mode 100644 changelog.d/9016.misc create mode 100644 synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql diff --git a/changelog.d/9016.misc b/changelog.d/9016.misc new file mode 100644 index 000000000000..0d455b17dba7 --- /dev/null +++ b/changelog.d/9016.misc @@ -0,0 +1 @@ +Ensure rejected events get added to some metadata tables. diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 90fb1a1f004e..5e7753e09b4a 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -799,7 +799,8 @@ def _update_outliers_txn(self, txn, events_and_contexts): return [ec for ec in events_and_contexts if ec[0] not in to_remove] def _store_event_txn(self, txn, events_and_contexts): - """Insert new events into the event and event_json tables + """Insert new events into the event, event_json, redaction and + state_events tables. Args: txn (twisted.enterprise.adbapi.Connection): db connection @@ -871,6 +872,29 @@ def event_dict(event): updatevalues={"have_censored": False}, ) + state_events_and_contexts = [ + ec for ec in events_and_contexts if ec[0].is_state() + ] + + state_values = [] + for event, context in state_events_and_contexts: + vals = { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + + # TODO: How does this work with backfilling? + if hasattr(event, "replaces_state"): + vals["prev_state"] = event.replaces_state + + state_values.append(vals) + + self.db_pool.simple_insert_many_txn( + txn, table="state_events", values=state_values + ) + def _store_rejected_events_txn(self, txn, events_and_contexts): """Add rows to the 'rejections' table for received events which were rejected @@ -987,29 +1011,6 @@ def _update_metadata_tables_txn( txn, [event for event, _ in events_and_contexts] ) - state_events_and_contexts = [ - ec for ec in events_and_contexts if ec[0].is_state() - ] - - state_values = [] - for event, context in state_events_and_contexts: - vals = { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - } - - # TODO: How does this work with backfilling? - if hasattr(event, "replaces_state"): - vals["prev_state"] = event.replaces_state - - state_values.append(vals) - - self.db_pool.simple_insert_many_txn( - txn, table="state_events", values=state_values - ) - # Prefill the event cache self._add_to_cache(txn, events_and_contexts) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 97b675484628..7e4b175d0861 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -14,10 +14,15 @@ # limitations under the License. import logging +from typing import List, Tuple from synapse.api.constants import EventContentFields +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.events import make_event_from_dict from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool +from synapse.storage.types import Cursor +from synapse.types import JsonDict logger = logging.getLogger(__name__) @@ -99,6 +104,10 @@ def __init__(self, database: DatabasePool, db_conn, hs): columns=["user_id", "created_ts"], ) + self.db_pool.updates.register_background_update_handler( + "rejected_events_metadata", self._rejected_events_metadata, + ) + async def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] @@ -582,3 +591,118 @@ def _event_store_labels_txn(txn): await self.db_pool.updates._end_background_update("event_store_labels") return num_rows + + async def _rejected_events_metadata(self, progress: dict, batch_size: int) -> int: + """Adds rejected events to the `state_events` and `event_auth` metadata + tables. + """ + + last_event_id = progress.get("last_event_id", "") + + def get_rejected_events( + txn: Cursor, + ) -> List[Tuple[str, str, JsonDict, bool, bool]]: + # Fetch rejected event json, their room version and whether we have + # inserted them into the state_events or auth_events tables. + # + # Note we can assume that events that don't have a corresponding + # room version are V1 rooms. + sql = """ + SELECT DISTINCT + event_id, + COALESCE(room_version, '1'), + json, + state_events.event_id IS NOT NULL, + event_auth.event_id IS NOT NULL + FROM rejections + INNER JOIN event_json USING (event_id) + LEFT JOIN rooms USING (room_id) + LEFT JOIN state_events USING (event_id) + LEFT JOIN event_auth USING (event_id) + WHERE event_id > ? + ORDER BY event_id + LIMIT ? + """ + + txn.execute(sql, (last_event_id, batch_size,)) + + return [(row[0], row[1], db_to_json(row[2]), row[3], row[4]) for row in txn] # type: ignore + + results = await self.db_pool.runInteraction( + desc="_rejected_events_metadata_get", func=get_rejected_events + ) + + if not results: + await self.db_pool.updates._end_background_update( + "rejected_events_metadata" + ) + return 0 + + state_events = [] + auth_events = [] + for event_id, room_version, event_json, has_state, has_event_auth in results: + last_event_id = event_id + + if has_state and has_event_auth: + continue + + room_version_obj = KNOWN_ROOM_VERSIONS.get(room_version) + if not room_version_obj: + # We no longer support this room version, so we just ignore the + # events entirely. + logger.info( + "Ignoring event with unknown room version %r: %r", + room_version, + event_id, + ) + continue + + event = make_event_from_dict(event_json, room_version_obj) + + if not event.is_state(): + continue + + if not has_state: + state_events.append( + { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + ) + + if not has_event_auth: + for auth_id in event.auth_event_ids(): + auth_events.append( + { + "room_id": event.room_id, + "event_id": event.event_id, + "auth_id": auth_id, + } + ) + + if state_events: + await self.db_pool.simple_insert_many( + table="state_events", + values=state_events, + desc="_rejected_events_metadata_state_events", + ) + + if auth_events: + await self.db_pool.simple_insert_many( + table="event_auth", + values=auth_events, + desc="_rejected_events_metadata_event_auth", + ) + + await self.db_pool.updates._background_update_progress( + "rejected_events_metadata", {"last_event_id": last_event_id} + ) + + if len(results) < batch_size: + await self.db_pool.updates._end_background_update( + "rejected_events_metadata" + ) + + return len(results) diff --git a/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql b/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql new file mode 100644 index 000000000000..9c95646281bd --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql @@ -0,0 +1,17 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (5828, 'rejected_events_metadata', '{}'); From 4e04435bda135d3441777a51aa54dbd4c3925f2b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 11 Jan 2021 13:58:19 +0000 Subject: [PATCH 26/26] Remove old tables after schema version bump (#9055) These tables are unused, and can be dropped now the schema version has been bumped. --- changelog.d/9055.misc | 1 + .../storage/databases/main/account_data.py | 48 +------------------ .../schema/delta/59/04drop_account_data.sql | 17 +++++++ .../schema/delta/59/05cache_invalidation.sql | 17 +++++++ synapse/storage/databases/main/tags.py | 10 ---- synapse/storage/prepare_database.py | 3 -- 6 files changed, 37 insertions(+), 59 deletions(-) create mode 100644 changelog.d/9055.misc create mode 100644 synapse/storage/databases/main/schema/delta/59/04drop_account_data.sql create mode 100644 synapse/storage/databases/main/schema/delta/59/05cache_invalidation.sql diff --git a/changelog.d/9055.misc b/changelog.d/9055.misc new file mode 100644 index 000000000000..8e0512eb1e14 --- /dev/null +++ b/changelog.d/9055.misc @@ -0,0 +1 @@ +Drop unused database tables. diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index bff51e92b9d1..bad82608922d 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -312,12 +312,9 @@ class AccountDataStore(AccountDataWorkerStore): def __init__(self, database: DatabasePool, db_conn, hs): self._account_data_id_gen = StreamIdGenerator( db_conn, - "account_data_max_stream_id", + "room_account_data", "stream_id", - extra_tables=[ - ("room_account_data", "stream_id"), - ("room_tags_revisions", "stream_id"), - ], + extra_tables=[("room_tags_revisions", "stream_id")], ) super().__init__(database, db_conn, hs) @@ -362,14 +359,6 @@ async def add_account_data_to_room( lock=False, ) - # it's theoretically possible for the above to succeed and the - # below to fail - in which case we might reuse a stream id on - # restart, and the above update might not get propagated. That - # doesn't sound any worse than the whole update getting lost, - # which is what would happen if we combined the two into one - # transaction. - await self._update_max_stream_id(next_id) - self._account_data_stream_cache.entity_has_changed(user_id, next_id) self.get_account_data_for_user.invalidate((user_id,)) self.get_account_data_for_room.invalidate((user_id, room_id)) @@ -402,18 +391,6 @@ async def add_account_data_for_user( content, ) - # it's theoretically possible for the above to succeed and the - # below to fail - in which case we might reuse a stream id on - # restart, and the above update might not get propagated. That - # doesn't sound any worse than the whole update getting lost, - # which is what would happen if we combined the two into one - # transaction. - # - # Note: This is only here for backwards compat to allow admins to - # roll back to a previous Synapse version. Next time we update the - # database version we can remove this table. - await self._update_max_stream_id(next_id) - self._account_data_stream_cache.entity_has_changed(user_id, next_id) self.get_account_data_for_user.invalidate((user_id,)) self.get_global_account_data_by_type_for_user.invalidate( @@ -486,24 +463,3 @@ def _add_account_data_for_user( # Invalidate the cache for any ignored users which were added or removed. for ignored_user_id in previously_ignored_users ^ currently_ignored_users: self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,)) - - async def _update_max_stream_id(self, next_id: int) -> None: - """Update the max stream_id - - Args: - next_id: The the revision to advance to. - """ - - # Note: This is only here for backwards compat to allow admins to - # roll back to a previous Synapse version. Next time we update the - # database version we can remove this table. - - def _update(txn): - update_max_id_sql = ( - "UPDATE account_data_max_stream_id" - " SET stream_id = ?" - " WHERE stream_id < ?" - ) - txn.execute(update_max_id_sql, (next_id, next_id)) - - await self.db_pool.runInteraction("update_account_data_max_stream_id", _update) diff --git a/synapse/storage/databases/main/schema/delta/59/04drop_account_data.sql b/synapse/storage/databases/main/schema/delta/59/04drop_account_data.sql new file mode 100644 index 000000000000..64ab696cfe16 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/04drop_account_data.sql @@ -0,0 +1,17 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- This is no longer used and was only kept until we bumped the schema version. +DROP TABLE IF EXISTS account_data_max_stream_id; diff --git a/synapse/storage/databases/main/schema/delta/59/05cache_invalidation.sql b/synapse/storage/databases/main/schema/delta/59/05cache_invalidation.sql new file mode 100644 index 000000000000..fb71b360a09d --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/05cache_invalidation.sql @@ -0,0 +1,17 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- This is no longer used and was only kept until we bumped the schema version. +DROP TABLE IF EXISTS cache_invalidation_stream; diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py index 9f120d3cb66c..74da9c49f24a 100644 --- a/synapse/storage/databases/main/tags.py +++ b/synapse/storage/databases/main/tags.py @@ -255,16 +255,6 @@ def _update_revision_txn( self._account_data_stream_cache.entity_has_changed, user_id, next_id ) - # Note: This is only here for backwards compat to allow admins to - # roll back to a previous Synapse version. Next time we update the - # database version we can remove this table. - update_max_id_sql = ( - "UPDATE account_data_max_stream_id" - " SET stream_id = ?" - " WHERE stream_id < ?" - ) - txn.execute(update_max_id_sql, (next_id, next_id)) - update_sql = ( "UPDATE room_tags_revisions" " SET stream_id = ?" diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 01efb2cabb7f..566ea19bae06 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -35,9 +35,6 @@ # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -# XXX: If you're about to bump this to 59 (or higher) please create an update -# that drops the unused `cache_invalidation_stream` table, as per #7436! -# XXX: Also add an update to drop `account_data_max_stream_id` as per #7656! SCHEMA_VERSION = 59 dir_path = os.path.abspath(os.path.dirname(__file__))