Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Regularly try to wake up dests instead of waiting for next PDU/EDU (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Velten authored Jun 16, 2023
1 parent d939120 commit f63d4a3
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 31 deletions.
1 change: 1 addition & 0 deletions changelog.d/15743.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Regularly try to send transactions to other servers after they failed instead of waiting for a new event to be available before trying.
34 changes: 16 additions & 18 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,8 @@
If a remote server is unreachable over federation, we back off from that server,
with an exponentially-increasing retry interval.
Whilst we don't automatically retry after the interval, we prevent making new attempts
until such time as the back-off has cleared.
Once the back-off is cleared and a new PDU or EDU arrives for transmission, the transmission
loop resumes and empties the queue by making federation requests.
We automatically retry after the retry interval expires (roughly, the logic to do so
being triggered every minute).
If the backoff grows too large (> 1 hour), the in-memory queue is emptied (to prevent
unbounded growth) and Catch-Up Mode is entered.
Expand Down Expand Up @@ -145,7 +143,6 @@
from typing_extensions import Literal

from twisted.internet import defer
from twisted.internet.interfaces import IDelayedCall

import synapse.metrics
from synapse.api.presence import UserPresenceState
Expand Down Expand Up @@ -184,14 +181,18 @@
"Total number of PDUs queued for sending across all destinations",
)

# Time (in s) after Synapse's startup that we will begin to wake up destinations
# that have catch-up outstanding.
CATCH_UP_STARTUP_DELAY_SEC = 15
# Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding. This will also be the delay applied at startup
# before trying the same.
# Please note that rate limiting still applies, so while the loop is
# executed every X seconds the destinations may not be wake up because
# they are being rate limited following previous attempt failures.
WAKEUP_RETRY_PERIOD_SEC = 60

# Time (in s) to wait in between waking up each destination, i.e. one destination
# will be woken up every <x> seconds after Synapse's startup until we have woken
# every destination has outstanding catch-up.
CATCH_UP_STARTUP_INTERVAL_SEC = 5
# will be woken up every <x> seconds until we have woken every destination
# has outstanding catch-up.
WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5


class AbstractFederationSender(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -415,12 +416,10 @@ def __init__(self, hs: "HomeServer"):
/ hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
)

# wake up destinations that have outstanding PDUs to be caught up
self._catchup_after_startup_timer: Optional[
IDelayedCall
] = self.clock.call_later(
CATCH_UP_STARTUP_DELAY_SEC,
# Regularly wake up destinations that have outstanding PDUs to be caught up
self.clock.looping_call(
run_as_background_process,
WAKEUP_RETRY_PERIOD_SEC * 1000.0,
"wake_destinations_needing_catchup",
self._wake_destinations_needing_catchup,
)
Expand Down Expand Up @@ -966,7 +965,6 @@ async def _wake_destinations_needing_catchup(self) -> None:

if not destinations_to_wake:
# finished waking all destinations!
self._catchup_after_startup_timer = None
break

last_processed = destinations_to_wake[-1]
Expand All @@ -983,4 +981,4 @@ async def _wake_destinations_needing_catchup(self) -> None:
last_processed,
)
self.wake_destination(destination)
await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)
await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC)
22 changes: 9 additions & 13 deletions tests/federation/test_federation_catch_up.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,28 +431,24 @@ def test_catch_up_on_synapse_startup(self) -> None:
# ACT: call _wake_destinations_needing_catchup

# patch wake_destination to just count the destinations instead
woken = []
woken = set()

def wake_destination_track(destination: str) -> None:
woken.append(destination)
woken.add(destination)

self.federation_sender.wake_destination = wake_destination_track # type: ignore[assignment]

# cancel the pre-existing timer for _wake_destinations_needing_catchup
# this is because we are calling it manually rather than waiting for it
# to be called automatically
assert self.federation_sender._catchup_after_startup_timer is not None
self.federation_sender._catchup_after_startup_timer.cancel()

self.get_success(
self.federation_sender._wake_destinations_needing_catchup(), by=5.0
)
# We wait quite long so that all dests can be woken up, since there is a delay
# between them.
self.pump(by=5.0)

# ASSERT (_wake_destinations_needing_catchup):
# - all remotes are woken up, save for zzzerver
self.assertNotIn("zzzerver", woken)
# - all destinations are woken exactly once; they appear once in woken.
self.assertCountEqual(woken, server_names[:-1])
# - all destinations are woken, potentially more than once, since the
# wake up is called regularly and we don't ack in this test that a transaction
# has been successfully sent.
self.assertCountEqual(woken, set(server_names[:-1]))

def test_not_latest_event(self) -> None:
"""Test that we send the latest event in the room even if its not ours."""
Expand Down

0 comments on commit f63d4a3

Please sign in to comment.