Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't wake up destination transaction queue if they're not due for retry. #16223

Merged
merged 12 commits into from
Sep 4, 2023
1 change: 1 addition & 0 deletions changelog.d/16223.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve resource usage when sending data to a large number of remote hosts that are marked as "down".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably .feature

6 changes: 3 additions & 3 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""
# nothing to do here: the replication listener will handle it.

def send_presence_to_destinations(
async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""As per FederationSender
Expand Down Expand Up @@ -463,7 +463,7 @@ class ParsedFederationStreamData:
edus: Dict[str, List[Edu]]


def process_rows_for_federation(
async def process_rows_for_federation(
transaction_queue: FederationSender,
rows: List[FederationStream.FederationStreamRow],
) -> None:
Expand Down Expand Up @@ -496,7 +496,7 @@ def process_rows_for_federation(
parsed_row.add_to_buffer(buff)

for state, destinations in buff.presence_destinations:
transaction_queue.send_presence_to_destinations(
await transaction_queue.send_presence_to_destinations(
states=[state], destinations=destinations
)

Expand Down
44 changes: 35 additions & 9 deletions synapse/federation/sender/__init__.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to add the filtering to

  • _send_pdu
  • send_presence_to_destinations
  • send_read_receipt

Are there other EDUs we should worry about here? (Typing? Device list stuff? to-device messages?)

What about the other methods on FederationSender?

  • send_edu
  • send_device_messages

is the point that only the former three handle multiple destinations in one call?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Oh there is logic below for typing EDUs. But why don't they go via the federation sender too?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, mainly because a) send_edu and send_device_messages just take a single host, and b) I forgot about device messages 🤦

Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@
import synapse.metrics
from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.per_destination_queue import (
CATCHUP_RETRY_INTERVAL,
PerDestinationQueue,
)
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
from synapse.logging.context import make_deferred_yieldable, run_in_background
Expand All @@ -161,9 +164,10 @@
run_as_background_process,
wrap_as_background_process,
)
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
from synapse.util import Clock
from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter

if TYPE_CHECKING:
from synapse.events.presence_router import PresenceRouter
Expand Down Expand Up @@ -213,7 +217,7 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
raise NotImplementedError()

@abc.abstractmethod
def send_presence_to_destinations(
async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""Send the given presence states to the given destinations.
Expand Down Expand Up @@ -716,6 +720,13 @@ async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
pdu.internal_metadata.stream_ordering,
)

destinations = await filter_destinations_by_retry_limiter(
destinations,
clock=self.clock,
store=self.store,
retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
)

for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu)

Expand Down Expand Up @@ -763,12 +774,20 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
room_id
)
domains = [
domains: StrCollection = [
d
for d in domains_set
if not self.is_mine_server_name(d)
and self._federation_shard_config.should_handle(self._instance_name, d)
]

domains = await filter_destinations_by_retry_limiter(
domains,
clock=self.clock,
store=self.store,
retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
)

if not domains:
return

Expand Down Expand Up @@ -816,7 +835,7 @@ def _flush_rrs_for_room(self, room_id: str) -> None:
for queue in queues:
queue.flush_read_receipts_for_room(room_id)

def send_presence_to_destinations(
async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""Send the given presence states to the given destinations.
Expand All @@ -831,13 +850,20 @@ def send_presence_to_destinations(
for state in states:
assert self.is_mine_id(state.user_id)

destinations = await filter_destinations_by_retry_limiter(
[
d
for d in destinations
if self._federation_shard_config.should_handle(self._instance_name, d)
],
clock=self.clock,
store=self.store,
retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
)

for destination in destinations:
if self.is_mine_server_name(destination):
continue
if not self._federation_shard_config.should_handle(
self._instance_name, destination
):
continue

self._get_per_destination_queue(destination).send_presence(
states, start_loop=False
Expand Down
6 changes: 5 additions & 1 deletion synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
)


# If the retry interval is larger than this then we enter "catchup" mode
CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000


class PerDestinationQueue:
"""
Manages the per-destination transmission queues.
Expand Down Expand Up @@ -370,7 +374,7 @@ async def _transaction_transmission_loop(self) -> None:
),
)

if e.retry_interval > 60 * 60 * 1000:
if e.retry_interval > CATCHUP_RETRY_INTERVAL:
# we won't retry for another hour!
# (this suggests a significant outage)
# We drop pending EDUs because otherwise they will
Expand Down
16 changes: 9 additions & 7 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,9 @@ async def maybe_send_presence_to_interested_destinations(
)

for destination, host_states in hosts_to_states.items():
self._federation.send_presence_to_destinations(host_states, [destination])
await self._federation.send_presence_to_destinations(
host_states, [destination]
)

async def send_full_presence_to_users(self, user_ids: StrCollection) -> None:
"""
Expand Down Expand Up @@ -936,7 +938,7 @@ async def _update_states(
)

for destination, states in hosts_to_states.items():
self._federation_queue.send_presence_to_destinations(
await self._federation_queue.send_presence_to_destinations(
states, [destination]
)

Expand Down Expand Up @@ -1508,7 +1510,7 @@ async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> Non
or state.status_msg is not None
]

self._federation_queue.send_presence_to_destinations(
await self._federation_queue.send_presence_to_destinations(
destinations=newly_joined_remote_hosts,
states=states,
)
Expand All @@ -1519,7 +1521,7 @@ async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> Non
prev_remote_hosts or newly_joined_remote_hosts
):
local_states = await self.current_state_for_users(newly_joined_local_users)
self._federation_queue.send_presence_to_destinations(
await self._federation_queue.send_presence_to_destinations(
destinations=prev_remote_hosts | newly_joined_remote_hosts,
states=list(local_states.values()),
)
Expand Down Expand Up @@ -2182,7 +2184,7 @@ def _clear_queue(self) -> None:
index = bisect(self._queue, (clear_before,))
self._queue = self._queue[index:]

def send_presence_to_destinations(
async def send_presence_to_destinations(
self, states: Collection[UserPresenceState], destinations: StrCollection
) -> None:
"""Send the presence states to the given destinations.
Expand All @@ -2202,7 +2204,7 @@ def send_presence_to_destinations(
return

if self._federation:
self._federation.send_presence_to_destinations(
await self._federation.send_presence_to_destinations(
states=states,
destinations=destinations,
)
Expand Down Expand Up @@ -2325,7 +2327,7 @@ async def process_replication_rows(

for host, user_ids in hosts_to_users.items():
states = await self._presence_handler.current_state_for_users(user_ids)
self._federation.send_presence_to_destinations(
await self._federation.send_presence_to_destinations(
states=states.values(),
destinations=[host],
)
14 changes: 11 additions & 3 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
)
from synapse.replication.tcp.streams import TypingStream
from synapse.streams import EventSource
from synapse.types import JsonDict, Requester, StreamKeyType, UserID
from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType, UserID
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter
from synapse.util.wheel_timer import WheelTimer

if TYPE_CHECKING:
Expand Down Expand Up @@ -150,8 +151,15 @@ async def _push_remote(self, member: RoomMember, typing: bool) -> None:
now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
)

hosts = await self._storage_controllers.state.get_current_hosts_in_room(
member.room_id
hosts: StrCollection = (
await self._storage_controllers.state.get_current_hosts_in_room(
member.room_id
)
)
hosts = await filter_destinations_by_retry_limiter(
hosts,
clock=self.clock,
store=self.store,
)
for domain in hosts:
if not self.is_mine_server_name(domain):
Expand Down
2 changes: 1 addition & 1 deletion synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None:

# Send to remote destinations.
destination = UserID.from_string(user).domain
presence_handler.get_federation_queue().send_presence_to_destinations(
await presence_handler.get_federation_queue().send_presence_to_destinations(
presence_events, [destination]
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ async def process_replication_rows(
# The federation stream contains things that we want to send out, e.g.
# presence, typing, etc.
if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows)
await send_queue.process_rows_for_federation(self.federation_sender, rows)
await self.update_token(token)

# ... and when new receipts happen
Expand Down
26 changes: 23 additions & 3 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import logging
from enum import Enum
from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, cast
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, cast

import attr
from canonicaljson import encode_canonical_json
Expand All @@ -28,8 +28,8 @@
LoggingTransaction,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.types import JsonDict
from synapse.util.caches.descriptors import cached
from synapse.types import JsonDict, StrCollection
from synapse.util.caches.descriptors import cached, cachedList

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -205,6 +205,26 @@ def _get_destination_retry_timings(
else:
return None

@cachedList(
cached_method_name="get_destination_retry_timings", list_name="destinations"
)
async def get_destination_retry_timings_batch(
self, destinations: StrCollection
) -> Dict[str, Optional[DestinationRetryTimings]]:
rows = await self.db_pool.simple_select_many_batch(
table="destinations",
iterable=destinations,
column="destination",
retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"),
desc="get_destination_retry_timings_batch",
)

return {
row.pop("destination"): DestinationRetryTimings(**row)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's critical here that the key (and hence the pop) is evaluated before the value!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is a little hacky. I can update it to actually specify each key explicitly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine, it just took me a moment to see what was going on. (And: it's the kind of thing that I'm paranoid might break in a future Python release)

for row in rows
if row["retry_last_ts"] and row["failure_ts"] and row["retry_interval"]
}

async def set_destination_retry_timings(
self,
destination: str,
Expand Down
25 changes: 25 additions & 0 deletions synapse/util/retryutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from synapse.api.errors import CodeMessageException
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage import DataStore
from synapse.types import StrCollection
from synapse.util import Clock

if TYPE_CHECKING:
Expand Down Expand Up @@ -116,6 +117,30 @@ async def get_retry_limiter(
)


async def filter_destinations_by_retry_limiter(
destinations: StrCollection,
clock: Clock,
store: DataStore,
retry_due_within_ms: int = 0,
) -> StrCollection:
"""Filter down the list of destinations to only those that will are either
alive or due for a retry (within `retry_due_within_ms`)
"""
if not destinations:
return destinations

retry_timings = await store.get_destination_retry_timings_batch(destinations)

now = int(clock.time_msec())

return [
destination
for destination, timings in retry_timings.items()
if timings is None
or timings.retry_last_ts + timings.retry_interval <= now + retry_due_within_ms
]
Comment on lines +132 to +141
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was gonna say "it might be faster to do this logic in the query"... but I guess that would get in the way of using cachedList to piggyback off the existing cache?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, exactly



class RetryDestinationLimiter:
def __init__(
self,
Expand Down
11 changes: 6 additions & 5 deletions tests/federation/test_federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_send_receipts(self) -> None:
thread_id=None,
data={"ts": 1234},
)
self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
self.get_success(sender.send_read_receipt(receipt))

self.pump()

Expand Down Expand Up @@ -111,6 +111,9 @@ def test_send_receipts_thread(self) -> None:
# * The same room / user on multiple threads.
# * A different user in the same room.
sender = self.hs.get_federation_sender()
# Hack so that we have a txn in-flight so we batch up read receipts
# below
sender.wake_destination("host2")
for user, thread in (
("alice", None),
("alice", "thread"),
Expand All @@ -125,9 +128,7 @@ def test_send_receipts_thread(self) -> None:
thread_id=thread,
data={"ts": 1234},
)
self.successResultOf(
defer.ensureDeferred(sender.send_read_receipt(receipt))
)
defer.ensureDeferred(sender.send_read_receipt(receipt))

self.pump()

Expand Down Expand Up @@ -191,7 +192,7 @@ def test_send_receipts_with_backoff(self) -> None:
thread_id=None,
data={"ts": 1234},
)
self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
self.get_success(sender.send_read_receipt(receipt))

self.pump()

Expand Down
Loading
Loading