diff --git a/changelog.d/16223.feature b/changelog.d/16223.feature new file mode 100644 index 000000000000..a52d66658bbd --- /dev/null +++ b/changelog.d/16223.feature @@ -0,0 +1 @@ +Improve resource usage when sending data to a large number of remote hosts that are marked as "down". diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index fb448f215566..652079563557 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -49,7 +49,7 @@ from synapse.federation.sender import AbstractFederationSender, FederationSender from synapse.metrics import LaterGauge from synapse.replication.tcp.streams.federation import FederationStream -from synapse.types import JsonDict, ReadReceipt, RoomStreamToken +from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection from synapse.util.metrics import Measure from .units import Edu @@ -229,7 +229,7 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None: """ # nothing to do here: the replication listener will handle it. - def send_presence_to_destinations( + async def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: """As per FederationSender @@ -245,7 +245,9 @@ def send_presence_to_destinations( self.notifier.on_new_replication_data() - def send_device_messages(self, destination: str, immediate: bool = True) -> None: + async def send_device_messages( + self, destinations: StrCollection, immediate: bool = True + ) -> None: """As per FederationSender""" # We don't need to replicate this as it gets sent down a different # stream. @@ -463,7 +465,7 @@ class ParsedFederationStreamData: edus: Dict[str, List[Edu]] -def process_rows_for_federation( +async def process_rows_for_federation( transaction_queue: FederationSender, rows: List[FederationStream.FederationStreamRow], ) -> None: @@ -496,7 +498,7 @@ def process_rows_for_federation( parsed_row.add_to_buffer(buff) for state, destinations in buff.presence_destinations: - transaction_queue.send_presence_to_destinations( + await transaction_queue.send_presence_to_destinations( states=[state], destinations=destinations ) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 97abbdee18e5..fb20fd8a10dc 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -147,7 +147,10 @@ import synapse.metrics from synapse.api.presence import UserPresenceState from synapse.events import EventBase -from synapse.federation.sender.per_destination_queue import PerDestinationQueue +from synapse.federation.sender.per_destination_queue import ( + CATCHUP_RETRY_INTERVAL, + PerDestinationQueue, +) from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.units import Edu from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -161,9 +164,10 @@ run_as_background_process, wrap_as_background_process, ) -from synapse.types import JsonDict, ReadReceipt, RoomStreamToken +from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection from synapse.util import Clock from synapse.util.metrics import Measure +from synapse.util.retryutils import filter_destinations_by_retry_limiter if TYPE_CHECKING: from synapse.events.presence_router import PresenceRouter @@ -213,7 +217,7 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None: raise NotImplementedError() @abc.abstractmethod - def send_presence_to_destinations( + async def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: """Send the given presence states to the given destinations. @@ -242,9 +246,11 @@ def build_and_send_edu( raise NotImplementedError() @abc.abstractmethod - def send_device_messages(self, destination: str, immediate: bool = True) -> None: + async def send_device_messages( + self, destinations: StrCollection, immediate: bool = True + ) -> None: """Tells the sender that a new device message is ready to be sent to the - destination. The `immediate` flag specifies whether the messages should + destinations. The `immediate` flag specifies whether the messages should be tried to be sent immediately, or whether it can be delayed for a short while (to aid performance). """ @@ -716,6 +722,13 @@ async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: pdu.internal_metadata.stream_ordering, ) + destinations = await filter_destinations_by_retry_limiter( + destinations, + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) + for destination in destinations: self._get_per_destination_queue(destination).send_pdu(pdu) @@ -763,12 +776,20 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None: domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation( room_id ) - domains = [ + domains: StrCollection = [ d for d in domains_set if not self.is_mine_server_name(d) and self._federation_shard_config.should_handle(self._instance_name, d) ] + + domains = await filter_destinations_by_retry_limiter( + domains, + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) + if not domains: return @@ -816,7 +837,7 @@ def _flush_rrs_for_room(self, room_id: str) -> None: for queue in queues: queue.flush_read_receipts_for_room(room_id) - def send_presence_to_destinations( + async def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: """Send the given presence states to the given destinations. @@ -831,13 +852,20 @@ def send_presence_to_destinations( for state in states: assert self.is_mine_id(state.user_id) + destinations = await filter_destinations_by_retry_limiter( + [ + d + for d in destinations + if self._federation_shard_config.should_handle(self._instance_name, d) + ], + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) + for destination in destinations: if self.is_mine_server_name(destination): continue - if not self._federation_shard_config.should_handle( - self._instance_name, destination - ): - continue self._get_per_destination_queue(destination).send_presence( states, start_loop=False @@ -896,21 +924,29 @@ def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None: else: queue.send_edu(edu) - def send_device_messages(self, destination: str, immediate: bool = True) -> None: - if self.is_mine_server_name(destination): - logger.warning("Not sending device update to ourselves") - return - - if not self._federation_shard_config.should_handle( - self._instance_name, destination - ): - return + async def send_device_messages( + self, destinations: StrCollection, immediate: bool = True + ) -> None: + destinations = await filter_destinations_by_retry_limiter( + [ + destination + for destination in destinations + if self._federation_shard_config.should_handle( + self._instance_name, destination + ) + and not self.is_mine_server_name(destination) + ], + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) - if immediate: - self._get_per_destination_queue(destination).attempt_new_transaction() - else: - self._get_per_destination_queue(destination).mark_new_data() - self._destination_wakeup_queue.add_to_queue(destination) + for destination in destinations: + if immediate: + self._get_per_destination_queue(destination).attempt_new_transaction() + else: + self._get_per_destination_queue(destination).mark_new_data() + self._destination_wakeup_queue.add_to_queue(destination) def wake_destination(self, destination: str) -> None: """Called when we want to retry sending transactions to a remote. diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 31c5c2b7dee2..9105ba664c48 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -59,6 +59,10 @@ ) +# If the retry interval is larger than this then we enter "catchup" mode +CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000 + + class PerDestinationQueue: """ Manages the per-destination transmission queues. @@ -370,7 +374,7 @@ async def _transaction_transmission_loop(self) -> None: ), ) - if e.retry_interval > 60 * 60 * 1000: + if e.retry_interval > CATCHUP_RETRY_INTERVAL: # we won't retry for another hour! # (this suggests a significant outage) # We drop pending EDUs because otherwise they will diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 5ae427d52cb7..763f56dfc16a 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -836,17 +836,16 @@ async def _handle_new_device_update_async(self) -> None: user_id, hosts, ) - for host in hosts: - self.federation_sender.send_device_messages( - host, immediate=False - ) - # TODO: when called, this isn't in a logging context. - # This leads to log spam, sentry event spam, and massive - # memory usage. - # See https://github.com/matrix-org/synapse/issues/12552. - # log_kv( - # {"message": "sent device update to host", "host": host} - # ) + await self.federation_sender.send_device_messages( + hosts, immediate=False + ) + # TODO: when called, this isn't in a logging context. + # This leads to log spam, sentry event spam, and massive + # memory usage. + # See https://github.com/matrix-org/synapse/issues/12552. + # log_kv( + # {"message": "sent device update to host", "host": host} + # ) if current_stream_id != stream_id: # Clear the set of hosts we've already sent to as we're @@ -951,8 +950,9 @@ async def handle_room_un_partial_stated(self, room_id: str) -> None: # Notify things that device lists need to be sent out. self.notifier.notify_replication() - for host in potentially_changed_hosts: - self.federation_sender.send_device_messages(host, immediate=False) + await self.federation_sender.send_device_messages( + potentially_changed_hosts, immediate=False + ) def _update_device_from_client_ips( diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 798c7039f9b4..1c79f7a61eb8 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -302,10 +302,9 @@ async def send_device_message( ) if self.federation_sender: - for destination in remote_messages.keys(): - # Enqueue a new federation transaction to send the new - # device messages to each remote destination. - self.federation_sender.send_device_messages(destination) + # Enqueue a new federation transaction to send the new + # device messages to each remote destination. + await self.federation_sender.send_device_messages(remote_messages.keys()) async def get_events_for_dehydrated_device( self, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 2f841863ae74..f31e18328bcc 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -354,7 +354,9 @@ async def maybe_send_presence_to_interested_destinations( ) for destination, host_states in hosts_to_states.items(): - self._federation.send_presence_to_destinations(host_states, [destination]) + await self._federation.send_presence_to_destinations( + host_states, [destination] + ) async def send_full_presence_to_users(self, user_ids: StrCollection) -> None: """ @@ -936,7 +938,7 @@ async def _update_states( ) for destination, states in hosts_to_states.items(): - self._federation_queue.send_presence_to_destinations( + await self._federation_queue.send_presence_to_destinations( states, [destination] ) @@ -1508,7 +1510,7 @@ async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> Non or state.status_msg is not None ] - self._federation_queue.send_presence_to_destinations( + await self._federation_queue.send_presence_to_destinations( destinations=newly_joined_remote_hosts, states=states, ) @@ -1519,7 +1521,7 @@ async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> Non prev_remote_hosts or newly_joined_remote_hosts ): local_states = await self.current_state_for_users(newly_joined_local_users) - self._federation_queue.send_presence_to_destinations( + await self._federation_queue.send_presence_to_destinations( destinations=prev_remote_hosts | newly_joined_remote_hosts, states=list(local_states.values()), ) @@ -2182,7 +2184,7 @@ def _clear_queue(self) -> None: index = bisect(self._queue, (clear_before,)) self._queue = self._queue[index:] - def send_presence_to_destinations( + async def send_presence_to_destinations( self, states: Collection[UserPresenceState], destinations: StrCollection ) -> None: """Send the presence states to the given destinations. @@ -2202,7 +2204,7 @@ def send_presence_to_destinations( return if self._federation: - self._federation.send_presence_to_destinations( + await self._federation.send_presence_to_destinations( states=states, destinations=destinations, ) @@ -2325,7 +2327,7 @@ async def process_replication_rows( for host, user_ids in hosts_to_users.items(): states = await self._presence_handler.current_state_for_users(user_ids) - self._federation.send_presence_to_destinations( + await self._federation.send_presence_to_destinations( states=states.values(), destinations=[host], ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 7aeae5319c3d..4b4227003d0a 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -26,9 +26,10 @@ ) from synapse.replication.tcp.streams import TypingStream from synapse.streams import EventSource -from synapse.types import JsonDict, Requester, StreamKeyType, UserID +from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType, UserID from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure +from synapse.util.retryutils import filter_destinations_by_retry_limiter from synapse.util.wheel_timer import WheelTimer if TYPE_CHECKING: @@ -150,8 +151,15 @@ async def _push_remote(self, member: RoomMember, typing: bool) -> None: now=now, obj=member, then=now + FEDERATION_PING_INTERVAL ) - hosts = await self._storage_controllers.state.get_current_hosts_in_room( - member.room_id + hosts: StrCollection = ( + await self._storage_controllers.state.get_current_hosts_in_room( + member.room_id + ) + ) + hosts = await filter_destinations_by_retry_limiter( + hosts, + clock=self.clock, + store=self.store, ) for domain in hosts: if not self.is_mine_server_name(domain): diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 9ad8e038aee2..2f00a7ba2039 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -1180,7 +1180,7 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: # Send to remote destinations. destination = UserID.from_string(user).domain - presence_handler.get_federation_queue().send_presence_to_destinations( + await presence_handler.get_federation_queue().send_presence_to_destinations( presence_events, [destination] ) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 3b88dc68eafb..51285e6d3304 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -422,7 +422,7 @@ async def process_replication_rows( # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. if stream_name == "federation": - send_queue.process_rows_for_federation(self.federation_sender, rows) + await send_queue.process_rows_for_federation(self.federation_sender, rows) await self.update_token(token) # ... and when new receipts happen @@ -439,16 +439,14 @@ async def process_replication_rows( for row in rows if not row.entity.startswith("@") and not row.is_signature } - for host in hosts: - self.federation_sender.send_device_messages(host, immediate=False) + await self.federation_sender.send_device_messages(hosts, immediate=False) elif stream_name == ToDeviceStream.NAME: # The to_device stream includes stuff to be pushed to both local # clients and remote servers, so we ignore entities that start with # '@' (since they'll be local users rather than destinations). hosts = {row.entity for row in rows if not row.entity.startswith("@")} - for host in hosts: - self.federation_sender.send_device_messages(host) + await self.federation_sender.send_device_messages(hosts) async def _on_new_receipts( self, rows: Iterable[ReceiptsStream.ReceiptsStreamRow] diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 860bbf7c0fec..efd21b5bfceb 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -14,7 +14,7 @@ import logging from enum import Enum -from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, cast import attr from canonicaljson import encode_canonical_json @@ -28,8 +28,8 @@ LoggingTransaction, ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore -from synapse.types import JsonDict -from synapse.util.caches.descriptors import cached +from synapse.types import JsonDict, StrCollection +from synapse.util.caches.descriptors import cached, cachedList if TYPE_CHECKING: from synapse.server import HomeServer @@ -205,6 +205,26 @@ def _get_destination_retry_timings( else: return None + @cachedList( + cached_method_name="get_destination_retry_timings", list_name="destinations" + ) + async def get_destination_retry_timings_batch( + self, destinations: StrCollection + ) -> Dict[str, Optional[DestinationRetryTimings]]: + rows = await self.db_pool.simple_select_many_batch( + table="destinations", + iterable=destinations, + column="destination", + retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"), + desc="get_destination_retry_timings_batch", + ) + + return { + row.pop("destination"): DestinationRetryTimings(**row) + for row in rows + if row["retry_last_ts"] and row["failure_ts"] and row["retry_interval"] + } + async def set_destination_retry_timings( self, destination: str, diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 9d2065372c42..0e1f907667ce 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -19,6 +19,7 @@ from synapse.api.errors import CodeMessageException from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage import DataStore +from synapse.types import StrCollection from synapse.util import Clock if TYPE_CHECKING: @@ -116,6 +117,30 @@ async def get_retry_limiter( ) +async def filter_destinations_by_retry_limiter( + destinations: StrCollection, + clock: Clock, + store: DataStore, + retry_due_within_ms: int = 0, +) -> StrCollection: + """Filter down the list of destinations to only those that will are either + alive or due for a retry (within `retry_due_within_ms`) + """ + if not destinations: + return destinations + + retry_timings = await store.get_destination_retry_timings_batch(destinations) + + now = int(clock.time_msec()) + + return [ + destination + for destination, timings in retry_timings.items() + if timings is None + or timings.retry_last_ts + timings.retry_interval <= now + retry_due_within_ms + ] + + class RetryDestinationLimiter: def __init__( self, diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index 7bd3d06859f6..caf04b54cbca 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -75,7 +75,7 @@ def test_send_receipts(self) -> None: thread_id=None, data={"ts": 1234}, ) - self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) + self.get_success(sender.send_read_receipt(receipt)) self.pump() @@ -111,6 +111,9 @@ def test_send_receipts_thread(self) -> None: # * The same room / user on multiple threads. # * A different user in the same room. sender = self.hs.get_federation_sender() + # Hack so that we have a txn in-flight so we batch up read receipts + # below + sender.wake_destination("host2") for user, thread in ( ("alice", None), ("alice", "thread"), @@ -125,9 +128,7 @@ def test_send_receipts_thread(self) -> None: thread_id=thread, data={"ts": 1234}, ) - self.successResultOf( - defer.ensureDeferred(sender.send_read_receipt(receipt)) - ) + defer.ensureDeferred(sender.send_read_receipt(receipt)) self.pump() @@ -191,7 +192,7 @@ def test_send_receipts_with_backoff(self) -> None: thread_id=None, data={"ts": 1234}, ) - self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) + self.get_success(sender.send_read_receipt(receipt)) self.pump() @@ -342,7 +343,9 @@ def test_send_device_updates(self) -> None: self.reactor.advance(1) # a second call should produce no new device EDUs - self.hs.get_federation_sender().send_device_messages("host2") + self.get_success( + self.hs.get_federation_sender().send_device_messages(["host2"]) + ) self.assertEqual(self.edus, []) # a second device @@ -550,7 +553,9 @@ def test_unreachable_server(self) -> None: # recover the server mock_send_txn.side_effect = self.record_transaction - self.hs.get_federation_sender().send_device_messages("host2") + self.get_success( + self.hs.get_federation_sender().send_device_messages(["host2"]) + ) # We queue up device list updates to be sent over federation, so we # advance to clear the queue. @@ -601,7 +606,9 @@ def test_prune_outbound_device_pokes1(self) -> None: # recover the server mock_send_txn.side_effect = self.record_transaction - self.hs.get_federation_sender().send_device_messages("host2") + self.get_success( + self.hs.get_federation_sender().send_device_messages(["host2"]) + ) # We queue up device list updates to be sent over federation, so we # advance to clear the queue. @@ -656,7 +663,9 @@ def test_prune_outbound_device_pokes2(self) -> None: # recover the server mock_send_txn.side_effect = self.record_transaction - self.hs.get_federation_sender().send_device_messages("host2") + self.get_success( + self.hs.get_federation_sender().send_device_messages(["host2"]) + ) # We queue up device list updates to be sent over federation, so we # advance to clear the queue. diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index a987267308ee..88a16193a3b6 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -909,8 +909,14 @@ def test_send_and_get(self) -> None: prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) now_token = self.queue.get_current_token(self.instance_name) @@ -946,11 +952,17 @@ def test_send_and_get_split(self) -> None: prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) now_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) rows, upto_token, limited = self.get_success( self.queue.get_replication_rows("master", prev_token, now_token, 10) @@ -989,8 +1001,14 @@ def test_clear_queue_all(self) -> None: prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) self.reactor.advance(10 * 60 * 1000) @@ -1005,8 +1023,14 @@ def test_clear_queue_all(self) -> None: prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) now_token = self.queue.get_current_token(self.instance_name) @@ -1033,11 +1057,17 @@ def test_partially_clear_queue(self) -> None: prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) self.reactor.advance(2 * 60 * 1000) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) self.reactor.advance(4 * 60 * 1000) @@ -1053,8 +1083,14 @@ def test_partially_clear_queue(self) -> None: prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) now_token = self.queue.get_current_token(self.instance_name) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 43c513b15722..95106ec8f316 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -120,8 +120,6 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.datastore = hs.get_datastores().main - self.datastore.get_destination_retry_timings = AsyncMock(return_value=None) - self.datastore.get_device_updates_by_remote = AsyncMock( # type: ignore[method-assign] return_value=(0, []) )