From be5a96fa9498943278c322060fbc7f7dcfb19767 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 15 May 2024 15:18:30 -0500 Subject: [PATCH 1/7] Refactor to be able to return different sync responses (`SyncVersion`) Split upon request: https://github.com/element-hq/synapse/pull/17167#discussion_r1601497279 Split from https://github.com/element-hq/synapse/pull/17167 where we will add `SyncVersion.E2EE_SYNC` and a new type of sync response. --- synapse/handlers/sync.py | 53 +++++++++++++++--- synapse/rest/client/sync.py | 2 + tests/events/test_presence_router.py | 4 +- tests/handlers/test_sync.py | 81 ++++++++++++++++++++++------ 4 files changed, 115 insertions(+), 25 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0bef58351c5..42d68a6dfa6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -20,6 +20,7 @@ # import itertools import logging +from enum import Enum from typing import ( TYPE_CHECKING, AbstractSet, @@ -112,6 +113,23 @@ SyncRequestKey = Tuple[Any, ...] +class SyncVersion(Enum): + """ + Enum for specifying the version of sync request. This is used to key which type of + sync response that we are generating. + + This is different than the `sync_type` you might see used in other code below; which + specifies the sub-type sync request (e.g. initial_sync, full_state_sync, + incremental_sync) and is really only relevant for the `/sync` v2 endpoint. + """ + + # These string values are semantically significant because they are used in the the + # metrics + + # Traditional `/sync` endpoint + SYNC_V2 = "sync_v2" + + @attr.s(slots=True, frozen=True, auto_attribs=True) class SyncConfig: user: UserID @@ -309,6 +327,7 @@ async def wait_for_sync_for_user( self, requester: Requester, sync_config: SyncConfig, + sync_version: SyncVersion, since_token: Optional[StreamToken] = None, timeout: int = 0, full_state: bool = False, @@ -327,6 +346,7 @@ async def wait_for_sync_for_user( sync_config.request_key, self._wait_for_sync_for_user, sync_config, + sync_version, since_token, timeout, full_state, @@ -338,6 +358,7 @@ async def wait_for_sync_for_user( async def _wait_for_sync_for_user( self, sync_config: SyncConfig, + sync_version: SyncVersion, since_token: Optional[StreamToken], timeout: int, full_state: bool, @@ -363,9 +384,11 @@ async def _wait_for_sync_for_user( else: sync_type = "incremental_sync" + sync_label = f"{sync_version}:{sync_type}" + context = current_context() if context: - context.tag = sync_type + context.tag = sync_label # if we have a since token, delete any to-device messages before that token # (since we now know that the device has received them) @@ -384,14 +407,16 @@ async def _wait_for_sync_for_user( # we are going to return immediately, so don't bother calling # notifier.wait_for_events. result: SyncResult = await self.current_sync_for_user( - sync_config, since_token, full_state=full_state + sync_config, sync_version, since_token, full_state=full_state ) else: # Otherwise, we wait for something to happen and report it to the user. async def current_sync_callback( before_token: StreamToken, after_token: StreamToken ) -> SyncResult: - return await self.current_sync_for_user(sync_config, since_token) + return await self.current_sync_for_user( + sync_config, sync_version, since_token + ) result = await self.notifier.wait_for_events( sync_config.user.to_string(), @@ -416,13 +441,14 @@ async def current_sync_callback( lazy_loaded = "true" else: lazy_loaded = "false" - non_empty_sync_counter.labels(sync_type, lazy_loaded).inc() + non_empty_sync_counter.labels(sync_label, lazy_loaded).inc() return result async def current_sync_for_user( self, sync_config: SyncConfig, + sync_version: SyncVersion, since_token: Optional[StreamToken] = None, full_state: bool = False, ) -> SyncResult: @@ -431,12 +457,25 @@ async def current_sync_for_user( This is a wrapper around `generate_sync_result` which starts an open tracing span to track the sync. See `generate_sync_result` for the next part of your indoctrination. + Args: + sync_config: Config/info necessary to process the sync request. + sync_version: Determines what kind of sync response to generate. + since_token: The point in the stream to sync from.p. + full_state: Whether to return the full state for each room. + Returns: + When `SyncVersion.SYNC_V2`, returns a full `SyncResult`. """ with start_active_span("sync.current_sync_for_user"): log_kv({"since_token": since_token}) - sync_result = await self.generate_sync_result( - sync_config, since_token, full_state - ) + # Go through the `/sync` v2 path + if sync_version == SyncVersion.SYNC_V2: + sync_result: SyncResult = await self.generate_sync_result( + sync_config, since_token, full_state + ) + else: + raise Exception( + f"Unknown sync_version (this is a Synapse problem): {sync_version}" + ) set_tag(SynapseTags.SYNC_RESULT, bool(sync_result)) return sync_result diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index d19aaf0e223..d0713536e12 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -40,6 +40,7 @@ KnockedSyncResult, SyncConfig, SyncResult, + SyncVersion, ) from synapse.http.server import HttpServer from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string @@ -232,6 +233,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: sync_result = await self.sync_handler.wait_for_sync_for_user( requester, sync_config, + SyncVersion.SYNC_V2, since_token=since_token, timeout=timeout, full_state=full_state, diff --git a/tests/events/test_presence_router.py b/tests/events/test_presence_router.py index e51cdf01ab0..aa67afa6956 100644 --- a/tests/events/test_presence_router.py +++ b/tests/events/test_presence_router.py @@ -36,7 +36,7 @@ from synapse.types import JsonDict, StreamToken, create_requester from synapse.util import Clock -from tests.handlers.test_sync import generate_sync_config +from tests.handlers.test_sync import SyncVersion, generate_sync_config from tests.unittest import ( FederatingHomeserverTestCase, HomeserverTestCase, @@ -521,7 +521,7 @@ def sync_presence( sync_config = generate_sync_config(requester.user.to_string()) sync_result = testcase.get_success( testcase.hs.get_sync_handler().wait_for_sync_for_user( - requester, sync_config, since_token + requester, sync_config, SyncVersion.SYNC_V2, since_token ) ) diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 2780d29cada..9c12a11e3a8 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -31,7 +31,7 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.federation.federation_base import event_from_pdu_json -from synapse.handlers.sync import SyncConfig, SyncResult +from synapse.handlers.sync import SyncConfig, SyncResult, SyncVersion from synapse.rest import admin from synapse.rest.client import knock, login, room from synapse.server import HomeServer @@ -73,13 +73,21 @@ def test_wait_for_sync_for_user_auth_blocking(self) -> None: # Check that the happy case does not throw errors self.get_success(self.store.upsert_monthly_active_user(user_id1)) self.get_success( - self.sync_handler.wait_for_sync_for_user(requester, sync_config) + self.sync_handler.wait_for_sync_for_user( + requester, + sync_config, + sync_version=SyncVersion.SYNC_V2, + ) ) # Test that global lock works self.auth_blocking._hs_disabled = True e = self.get_failure( - self.sync_handler.wait_for_sync_for_user(requester, sync_config), + self.sync_handler.wait_for_sync_for_user( + requester, + sync_config, + sync_version=SyncVersion.SYNC_V2, + ), ResourceLimitError, ) self.assertEqual(e.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) @@ -90,7 +98,11 @@ def test_wait_for_sync_for_user_auth_blocking(self) -> None: requester = create_requester(user_id2) e = self.get_failure( - self.sync_handler.wait_for_sync_for_user(requester, sync_config), + self.sync_handler.wait_for_sync_for_user( + requester, + sync_config, + sync_version=SyncVersion.SYNC_V2, + ), ResourceLimitError, ) self.assertEqual(e.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) @@ -109,7 +121,9 @@ def test_unknown_room_version(self) -> None: requester = create_requester(user) initial_result = self.get_success( self.sync_handler.wait_for_sync_for_user( - requester, sync_config=generate_sync_config(user, device_id="dev") + requester, + sync_config=generate_sync_config(user, device_id="dev"), + sync_version=SyncVersion.SYNC_V2, ) ) @@ -140,7 +154,9 @@ def test_unknown_room_version(self) -> None: # The rooms should appear in the sync response. result = self.get_success( self.sync_handler.wait_for_sync_for_user( - requester, sync_config=generate_sync_config(user) + requester, + sync_config=generate_sync_config(user), + sync_version=SyncVersion.SYNC_V2, ) ) self.assertIn(joined_room, [r.room_id for r in result.joined]) @@ -152,6 +168,7 @@ def test_unknown_room_version(self) -> None: self.sync_handler.wait_for_sync_for_user( requester, sync_config=generate_sync_config(user, device_id="dev"), + sync_version=SyncVersion.SYNC_V2, since_token=initial_result.next_batch, ) ) @@ -180,7 +197,9 @@ def test_unknown_room_version(self) -> None: # Get a new request key. result = self.get_success( self.sync_handler.wait_for_sync_for_user( - requester, sync_config=generate_sync_config(user) + requester, + sync_config=generate_sync_config(user), + sync_version=SyncVersion.SYNC_V2, ) ) self.assertNotIn(joined_room, [r.room_id for r in result.joined]) @@ -192,6 +211,7 @@ def test_unknown_room_version(self) -> None: self.sync_handler.wait_for_sync_for_user( requester, sync_config=generate_sync_config(user, device_id="dev"), + sync_version=SyncVersion.SYNC_V2, since_token=initial_result.next_batch, ) ) @@ -231,7 +251,9 @@ def test_ban_wins_race_with_join(self) -> None: # Do a sync as Alice to get the latest event in the room. alice_sync_result: SyncResult = self.get_success( self.sync_handler.wait_for_sync_for_user( - create_requester(owner), generate_sync_config(owner) + create_requester(owner), + generate_sync_config(owner), + sync_version=SyncVersion.SYNC_V2, ) ) self.assertEqual(len(alice_sync_result.joined), 1) @@ -251,7 +273,11 @@ def test_ban_wins_race_with_join(self) -> None: eve_requester = create_requester(eve) eve_sync_config = generate_sync_config(eve) eve_sync_after_ban: SyncResult = self.get_success( - self.sync_handler.wait_for_sync_for_user(eve_requester, eve_sync_config) + self.sync_handler.wait_for_sync_for_user( + eve_requester, + eve_sync_config, + sync_version=SyncVersion.SYNC_V2, + ) ) # Sanity check this sync result. We shouldn't be joined to the room. @@ -268,6 +294,7 @@ def test_ban_wins_race_with_join(self) -> None: self.sync_handler.wait_for_sync_for_user( eve_requester, eve_sync_config, + sync_version=SyncVersion.SYNC_V2, since_token=eve_sync_after_ban.next_batch, ) ) @@ -279,6 +306,7 @@ def test_ban_wins_race_with_join(self) -> None: self.sync_handler.wait_for_sync_for_user( eve_requester, eve_sync_config, + sync_version=SyncVersion.SYNC_V2, since_token=None, ) ) @@ -310,7 +338,9 @@ def test_state_includes_changes_on_forks(self) -> None: # Do an initial sync as Alice to get a known starting point. initial_sync_result = self.get_success( self.sync_handler.wait_for_sync_for_user( - alice_requester, generate_sync_config(alice) + alice_requester, + generate_sync_config(alice), + sync_version=SyncVersion.SYNC_V2, ) ) last_room_creation_event_id = ( @@ -338,6 +368,7 @@ def test_state_includes_changes_on_forks(self) -> None: self.hs, {"room": {"timeline": {"limit": 2}}} ), ), + sync_version=SyncVersion.SYNC_V2, since_token=initial_sync_result.next_batch, ) ) @@ -380,7 +411,9 @@ def test_state_includes_changes_on_forks_when_events_excluded(self) -> None: # Do an initial sync as Alice to get a known starting point. initial_sync_result = self.get_success( self.sync_handler.wait_for_sync_for_user( - alice_requester, generate_sync_config(alice) + alice_requester, + generate_sync_config(alice), + sync_version=SyncVersion.SYNC_V2, ) ) last_room_creation_event_id = ( @@ -418,6 +451,7 @@ def test_state_includes_changes_on_forks_when_events_excluded(self) -> None: }, ), ), + sync_version=SyncVersion.SYNC_V2, since_token=initial_sync_result.next_batch, ) ) @@ -461,7 +495,9 @@ def test_state_includes_changes_on_long_lived_forks(self) -> None: # Do an initial sync as Alice to get a known starting point. initial_sync_result = self.get_success( self.sync_handler.wait_for_sync_for_user( - alice_requester, generate_sync_config(alice) + alice_requester, + generate_sync_config(alice), + sync_version=SyncVersion.SYNC_V2, ) ) last_room_creation_event_id = ( @@ -486,6 +522,7 @@ def test_state_includes_changes_on_long_lived_forks(self) -> None: self.hs, {"room": {"timeline": {"limit": 1}}} ), ), + sync_version=SyncVersion.SYNC_V2, since_token=initial_sync_result.next_batch, ) ) @@ -515,6 +552,7 @@ def test_state_includes_changes_on_long_lived_forks(self) -> None: self.hs, {"room": {"timeline": {"limit": 1}}} ), ), + sync_version=SyncVersion.SYNC_V2, since_token=incremental_sync.next_batch, ) ) @@ -574,7 +612,9 @@ def test_state_includes_changes_on_ungappy_syncs(self) -> None: # Do an initial sync to get a known starting point. initial_sync_result = self.get_success( self.sync_handler.wait_for_sync_for_user( - alice_requester, generate_sync_config(alice) + alice_requester, + generate_sync_config(alice), + sync_version=SyncVersion.SYNC_V2, ) ) last_room_creation_event_id = ( @@ -598,6 +638,7 @@ def test_state_includes_changes_on_ungappy_syncs(self) -> None: self.hs, {"room": {"timeline": {"limit": 1}}} ), ), + sync_version=SyncVersion.SYNC_V2, ) ) room_sync = initial_sync_result.joined[0] @@ -618,6 +659,7 @@ def test_state_includes_changes_on_ungappy_syncs(self) -> None: self.sync_handler.wait_for_sync_for_user( alice_requester, generate_sync_config(alice), + sync_version=SyncVersion.SYNC_V2, since_token=initial_sync_result.next_batch, ) ) @@ -668,7 +710,9 @@ def test_archived_rooms_do_not_include_state_after_leave( initial_sync_result = self.get_success( self.sync_handler.wait_for_sync_for_user( - bob_requester, generate_sync_config(bob) + bob_requester, + generate_sync_config(bob), + sync_version=SyncVersion.SYNC_V2, ) ) @@ -699,6 +743,7 @@ def test_archived_rooms_do_not_include_state_after_leave( generate_sync_config( bob, filter_collection=FilterCollection(self.hs, filter_dict) ), + sync_version=SyncVersion.SYNC_V2, since_token=None if initial_sync else initial_sync_result.next_batch, ) ).archived[0] @@ -791,7 +836,9 @@ async def _check_sigs_and_hash_for_pulled_events_and_fetch( # but that it does not come down /sync in public room sync_result: SyncResult = self.get_success( self.sync_handler.wait_for_sync_for_user( - create_requester(user), generate_sync_config(user) + create_requester(user), + generate_sync_config(user), + sync_version=SyncVersion.SYNC_V2, ) ) event_ids = [] @@ -837,7 +884,9 @@ async def _check_sigs_and_hash_for_pulled_events_and_fetch( private_sync_result: SyncResult = self.get_success( self.sync_handler.wait_for_sync_for_user( - create_requester(user2), generate_sync_config(user2) + create_requester(user2), + generate_sync_config(user2), + sync_version=SyncVersion.SYNC_V2, ) ) priv_event_ids = [] From d9b0669ac7e83f7b5049857cc1f1207b26891ece Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 15 May 2024 15:26:04 -0500 Subject: [PATCH 2/7] Add changelog --- changelog.d/17200.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17200.misc diff --git a/changelog.d/17200.misc b/changelog.d/17200.misc new file mode 100644 index 00000000000..a02b315041d --- /dev/null +++ b/changelog.d/17200.misc @@ -0,0 +1 @@ +Prepare sync handler to be able to return different sync responses (`SyncVersion`). From 5d6efb014754a1532582d61629dfb48eb3b7ac87 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 15 May 2024 15:32:56 -0500 Subject: [PATCH 3/7] Update docstrings --- synapse/handlers/sync.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 42d68a6dfa6..53fe2a6a534 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -335,6 +335,17 @@ async def wait_for_sync_for_user( """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then return an empty sync result. + + Args: + requester: The user requesting the sync response. + sync_config: Config/info necessary to process the sync request. + sync_version: Determines what kind of sync response to generate. + since_token: The point in the stream to sync from. + timeout: How long to wait for new data to arrive before giving up. + full_state: Whether to return the full state for each room. + + Returns: + When `SyncVersion.SYNC_V2`, returns a full `SyncResult`. """ # If the user is not part of the mau group, then check that limits have # not been exceeded (if not part of the group by this point, almost certain @@ -457,6 +468,7 @@ async def current_sync_for_user( This is a wrapper around `generate_sync_result` which starts an open tracing span to track the sync. See `generate_sync_result` for the next part of your indoctrination. + Args: sync_config: Config/info necessary to process the sync request. sync_version: Determines what kind of sync response to generate. From 562105ad8f0aeaa2d07b7918c62829e85c982a05 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 15 May 2024 15:40:55 -0500 Subject: [PATCH 4/7] Move sync cache `request_key` outside of the config Removed `request_key` from the `SyncConfig` (moved outside as its own function parameter) so it doesn't have to flow into `_generate_sync_entry_for_xxx` methods. This way we can separate the concerns of caching from generating the response and reuse the `_generate_sync_entry_for_xxx` functions as we see fit in this new usage. Plus caching doesn't really have anything to do with the config of sync. Split from https://github.com/element-hq/synapse/pull/17167 Spawning from https://github.com/element-hq/synapse/pull/17167#discussion_r1601497279 --- synapse/handlers/sync.py | 6 ++-- synapse/rest/client/sync.py | 2 +- tests/events/test_presence_router.py | 17 +++++++++-- tests/handlers/test_sync.py | 42 +++++++++++++++++++++++----- 4 files changed, 54 insertions(+), 13 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 53fe2a6a534..f1f20ce61ff 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -135,7 +135,6 @@ class SyncConfig: user: UserID filter_collection: FilterCollection is_guest: bool - request_key: SyncRequestKey device_id: Optional[str] @@ -328,6 +327,7 @@ async def wait_for_sync_for_user( requester: Requester, sync_config: SyncConfig, sync_version: SyncVersion, + request_key: SyncRequestKey, since_token: Optional[StreamToken] = None, timeout: int = 0, full_state: bool = False, @@ -340,10 +340,10 @@ async def wait_for_sync_for_user( requester: The user requesting the sync response. sync_config: Config/info necessary to process the sync request. sync_version: Determines what kind of sync response to generate. + request_key: The key to use for caching the response. since_token: The point in the stream to sync from. timeout: How long to wait for new data to arrive before giving up. full_state: Whether to return the full state for each room. - Returns: When `SyncVersion.SYNC_V2`, returns a full `SyncResult`. """ @@ -354,7 +354,7 @@ async def wait_for_sync_for_user( await self.auth_blocking.check_auth_blocking(requester=requester) res = await self.response_cache.wrap( - sync_config.request_key, + request_key, self._wait_for_sync_for_user, sync_config, sync_version, diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index d0713536e12..4a57eaf9307 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -210,7 +210,6 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: user=user, filter_collection=filter_collection, is_guest=requester.is_guest, - request_key=request_key, device_id=device_id, ) @@ -234,6 +233,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester, sync_config, SyncVersion.SYNC_V2, + request_key, since_token=since_token, timeout=timeout, full_state=full_state, diff --git a/tests/events/test_presence_router.py b/tests/events/test_presence_router.py index aa67afa6956..e48983ddfe7 100644 --- a/tests/events/test_presence_router.py +++ b/tests/events/test_presence_router.py @@ -36,7 +36,7 @@ from synapse.types import JsonDict, StreamToken, create_requester from synapse.util import Clock -from tests.handlers.test_sync import SyncVersion, generate_sync_config +from tests.handlers.test_sync import SyncRequestKey, SyncVersion, generate_sync_config from tests.unittest import ( FederatingHomeserverTestCase, HomeserverTestCase, @@ -498,6 +498,15 @@ def send_presence_update( return channel.json_body +_request_key = 0 + + +def generate_request_key() -> SyncRequestKey: + global _request_key + _request_key += 1 + return ("request_key", _request_key) + + def sync_presence( testcase: HomeserverTestCase, user_id: str, @@ -521,7 +530,11 @@ def sync_presence( sync_config = generate_sync_config(requester.user.to_string()) sync_result = testcase.get_success( testcase.hs.get_sync_handler().wait_for_sync_for_user( - requester, sync_config, SyncVersion.SYNC_V2, since_token + requester, + sync_config, + SyncVersion.SYNC_V2, + generate_request_key(), + since_token, ) ) diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 9c12a11e3a8..118aeaa9d59 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -31,7 +31,7 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.federation.federation_base import event_from_pdu_json -from synapse.handlers.sync import SyncConfig, SyncResult, SyncVersion +from synapse.handlers.sync import SyncConfig, SyncRequestKey, SyncResult, SyncVersion from synapse.rest import admin from synapse.rest.client import knock, login, room from synapse.server import HomeServer @@ -41,6 +41,14 @@ import tests.unittest import tests.utils +_request_key = 0 + + +def generate_request_key() -> SyncRequestKey: + global _request_key + _request_key += 1 + return ("request_key", _request_key) + class SyncTestCase(tests.unittest.HomeserverTestCase): """Tests Sync Handler.""" @@ -77,6 +85,7 @@ def test_wait_for_sync_for_user_auth_blocking(self) -> None: requester, sync_config, sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ) ) @@ -87,6 +96,7 @@ def test_wait_for_sync_for_user_auth_blocking(self) -> None: requester, sync_config, sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ), ResourceLimitError, ) @@ -102,6 +112,7 @@ def test_wait_for_sync_for_user_auth_blocking(self) -> None: requester, sync_config, sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ), ResourceLimitError, ) @@ -124,6 +135,7 @@ def test_unknown_room_version(self) -> None: requester, sync_config=generate_sync_config(user, device_id="dev"), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ) ) @@ -157,6 +169,7 @@ def test_unknown_room_version(self) -> None: requester, sync_config=generate_sync_config(user), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ) ) self.assertIn(joined_room, [r.room_id for r in result.joined]) @@ -169,6 +182,7 @@ def test_unknown_room_version(self) -> None: requester, sync_config=generate_sync_config(user, device_id="dev"), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), since_token=initial_result.next_batch, ) ) @@ -200,6 +214,7 @@ def test_unknown_room_version(self) -> None: requester, sync_config=generate_sync_config(user), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ) ) self.assertNotIn(joined_room, [r.room_id for r in result.joined]) @@ -212,6 +227,7 @@ def test_unknown_room_version(self) -> None: requester, sync_config=generate_sync_config(user, device_id="dev"), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), since_token=initial_result.next_batch, ) ) @@ -254,6 +270,7 @@ def test_ban_wins_race_with_join(self) -> None: create_requester(owner), generate_sync_config(owner), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ) ) self.assertEqual(len(alice_sync_result.joined), 1) @@ -277,6 +294,7 @@ def test_ban_wins_race_with_join(self) -> None: eve_requester, eve_sync_config, sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ) ) @@ -295,6 +313,7 @@ def test_ban_wins_race_with_join(self) -> None: eve_requester, eve_sync_config, sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), since_token=eve_sync_after_ban.next_batch, ) ) @@ -307,6 +326,7 @@ def test_ban_wins_race_with_join(self) -> None: eve_requester, eve_sync_config, sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), since_token=None, ) ) @@ -341,6 +361,7 @@ def test_state_includes_changes_on_forks(self) -> None: alice_requester, generate_sync_config(alice), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ) ) last_room_creation_event_id = ( @@ -369,6 +390,7 @@ def test_state_includes_changes_on_forks(self) -> None: ), ), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), since_token=initial_sync_result.next_batch, ) ) @@ -414,6 +436,7 @@ def test_state_includes_changes_on_forks_when_events_excluded(self) -> None: alice_requester, generate_sync_config(alice), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ) ) last_room_creation_event_id = ( @@ -452,6 +475,7 @@ def test_state_includes_changes_on_forks_when_events_excluded(self) -> None: ), ), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), since_token=initial_sync_result.next_batch, ) ) @@ -498,6 +522,7 @@ def test_state_includes_changes_on_long_lived_forks(self) -> None: alice_requester, generate_sync_config(alice), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ) ) last_room_creation_event_id = ( @@ -523,6 +548,7 @@ def test_state_includes_changes_on_long_lived_forks(self) -> None: ), ), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), since_token=initial_sync_result.next_batch, ) ) @@ -553,6 +579,7 @@ def test_state_includes_changes_on_long_lived_forks(self) -> None: ), ), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), since_token=incremental_sync.next_batch, ) ) @@ -615,6 +642,7 @@ def test_state_includes_changes_on_ungappy_syncs(self) -> None: alice_requester, generate_sync_config(alice), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ) ) last_room_creation_event_id = ( @@ -639,6 +667,7 @@ def test_state_includes_changes_on_ungappy_syncs(self) -> None: ), ), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ) ) room_sync = initial_sync_result.joined[0] @@ -660,6 +689,7 @@ def test_state_includes_changes_on_ungappy_syncs(self) -> None: alice_requester, generate_sync_config(alice), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), since_token=initial_sync_result.next_batch, ) ) @@ -713,6 +743,7 @@ def test_archived_rooms_do_not_include_state_after_leave( bob_requester, generate_sync_config(bob), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ) ) @@ -744,6 +775,7 @@ def test_archived_rooms_do_not_include_state_after_leave( bob, filter_collection=FilterCollection(self.hs, filter_dict) ), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), since_token=None if initial_sync else initial_sync_result.next_batch, ) ).archived[0] @@ -839,6 +871,7 @@ async def _check_sigs_and_hash_for_pulled_events_and_fetch( create_requester(user), generate_sync_config(user), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ) ) event_ids = [] @@ -887,6 +920,7 @@ async def _check_sigs_and_hash_for_pulled_events_and_fetch( create_requester(user2), generate_sync_config(user2), sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ) ) priv_event_ids = [] @@ -896,9 +930,6 @@ async def _check_sigs_and_hash_for_pulled_events_and_fetch( self.assertIn(private_call_event.event_id, priv_event_ids) -_request_key = 0 - - def generate_sync_config( user_id: str, device_id: Optional[str] = "device_id", @@ -915,12 +946,9 @@ def generate_sync_config( if filter_collection is None: filter_collection = Filtering(Mock()).DEFAULT_FILTER_COLLECTION - global _request_key - _request_key += 1 return SyncConfig( user=UserID.from_string(user_id), filter_collection=filter_collection, is_guest=False, - request_key=("request_key", _request_key), device_id=device_id, ) From 1403d2de4c3eda40ee40fd1a348a5477ffec9a51 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 15 May 2024 15:46:05 -0500 Subject: [PATCH 5/7] Add changelog --- changelog.d/17201.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17201.misc diff --git a/changelog.d/17201.misc b/changelog.d/17201.misc new file mode 100644 index 00000000000..2bd08d8f061 --- /dev/null +++ b/changelog.d/17201.misc @@ -0,0 +1 @@ +Organize the sync cache key parameter outside of the sync config (separate concerns). From 2942a76621bdac819945ee51a456ba00a930cf53 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 16 May 2024 11:28:56 -0500 Subject: [PATCH 6/7] Fix missing test from bad conflict --- tests/handlers/test_sync.py | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 118aeaa9d59..0f5fae46382 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -24,7 +24,7 @@ from twisted.test.proto_helpers import MemoryReactor -from synapse.api.constants import EventTypes, JoinRules +from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules from synapse.api.errors import Codes, ResourceLimitError from synapse.api.filtering import FilterCollection, Filtering from synapse.api.room_versions import RoomVersion, RoomVersions @@ -928,7 +928,33 @@ async def _check_sigs_and_hash_for_pulled_events_and_fetch( priv_event_ids.append(event.event_id) self.assertIn(private_call_event.event_id, priv_event_ids) + + def test_push_rules_with_bad_account_data(self) -> None: + """Some old accounts have managed to set a `m.push_rules` account data, + which we should ignore in /sync response. + """ + + user = self.register_user("alice", "password") + + # Insert the bad account data. + self.get_success( + self.store.add_account_data_for_user(user, AccountDataTypes.PUSH_RULES, {}) + ) + + sync_result: SyncResult = self.get_success( + self.sync_handler.wait_for_sync_for_user( + create_requester(user), generate_sync_config(user) + ) + ) + + for account_dict in sync_result.account_data: + if account_dict["type"] == AccountDataTypes.PUSH_RULES: + # We should have lots of push rules here, rather than the bad + # empty data. + self.assertNotEqual(account_dict["content"], {}) + return + self.fail("No push rules found") def generate_sync_config( user_id: str, From 644af1949df5bcb07194df131edbb8b5d538f6b9 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 16 May 2024 11:30:19 -0500 Subject: [PATCH 7/7] Add in changes --- tests/handlers/test_sync.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 0f5fae46382..02371ce7247 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -928,7 +928,7 @@ async def _check_sigs_and_hash_for_pulled_events_and_fetch( priv_event_ids.append(event.event_id) self.assertIn(private_call_event.event_id, priv_event_ids) - + def test_push_rules_with_bad_account_data(self) -> None: """Some old accounts have managed to set a `m.push_rules` account data, which we should ignore in /sync response. @@ -943,7 +943,10 @@ def test_push_rules_with_bad_account_data(self) -> None: sync_result: SyncResult = self.get_success( self.sync_handler.wait_for_sync_for_user( - create_requester(user), generate_sync_config(user) + create_requester(user), + generate_sync_config(user), + sync_version=SyncVersion.SYNC_V2, + request_key=generate_request_key(), ) ) @@ -956,6 +959,7 @@ def test_push_rules_with_bad_account_data(self) -> None: self.fail("No push rules found") + def generate_sync_config( user_id: str, device_id: Optional[str] = "device_id",