Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Track number of hosts affected by the rate limiter #13541

Merged
1 change: 1 addition & 0 deletions changelog.d/13534.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add metrics to track how the rate limiter is affecting requests (sleep/reject).
1 change: 1 addition & 0 deletions changelog.d/13541.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add metrics to track how the rate limiter is affecting requests (sleep/reject).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same changelog as #13534 so they merge together.

12 changes: 12 additions & 0 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ def collect() -> Iterable[Metric]:
# TODO Do something nicer about this.
RegistryProxy = cast(CollectorRegistry, _RegistryProxy)

T = TypeVar("T")


def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
"""Return the number of items in it for which func returns true."""
n = 0
for x in it:
if func(x):
n += 1
return n


@attr.s(slots=True, hash=True, auto_attribs=True)
class LaterGauge(Collector):
Expand Down Expand Up @@ -475,6 +486,7 @@ def register_threadpool(name: str, threadpool: ThreadPool) -> None:
"MetricsResource",
"generate_latest",
"start_http_server",
"count",
"LaterGauge",
"InFlightGauge",
"GaugeBucketCollector",
Expand Down
13 changes: 1 addition & 12 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
Callable,
Collection,
Dict,
Iterable,
List,
Optional,
Set,
Expand All @@ -40,7 +39,7 @@
from synapse.logging import issue9533_logger
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import log_kv, start_active_span
from synapse.metrics import LaterGauge
from synapse.metrics import LaterGauge, count
from synapse.streams.config import PaginationConfig
from synapse.types import (
JsonDict,
Expand Down Expand Up @@ -68,16 +67,6 @@
T = TypeVar("T")


# TODO(paul): Should be shared somewhere
def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
"""Return the number of items in it for which func returns true."""
n = 0
for x in it:
if func(x):
n += 1
return n


class _NotificationListener:
"""This represents a single client connection to the events stream.
The events stream handler will have yielded to the deferred, so to
Expand Down
88 changes: 77 additions & 11 deletions synapse/util/ratelimitutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import typing
from typing import Any, DefaultDict, Iterator, List, Set

from prometheus_client.core import Counter

from twisted.internet import defer

from synapse.api.errors import LimitExceededError
Expand All @@ -27,6 +29,7 @@
make_deferred_yieldable,
run_in_background,
)
from synapse.metrics import LaterGauge, count
from synapse.util import Clock

if typing.TYPE_CHECKING:
Expand All @@ -35,6 +38,11 @@
logger = logging.getLogger(__name__)


# Track how much the ratelimiter is affecting requests
rate_limit_sleep_counter = Counter("synapse_rate_limit_sleep", "")
rate_limit_reject_counter = Counter("synapse_rate_limit_reject", "")


class FederationRateLimiter:
def __init__(self, clock: Clock, config: FederationRatelimitSettings):
def new_limiter() -> "_PerHostRatelimiter":
Expand All @@ -44,6 +52,34 @@ def new_limiter() -> "_PerHostRatelimiter":
str, "_PerHostRatelimiter"
] = collections.defaultdict(new_limiter)

# We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
LaterGauge(
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
"synapse_rate_limit_sleep_affected_hosts",
"Number of hosts that had requests put to sleep",
[],
lambda: count(
bool,
[
ratelimiter.should_sleep()
for ratelimiter in self.ratelimiters.values()
],
),
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)
LaterGauge(
"synapse_rate_limit_reject_affected_hosts",
"Number of hosts that had requests rejected",
[],
lambda: count(
bool,
[
ratelimiter.should_reject()
for ratelimiter in self.ratelimiters.values()
],
),
)

def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]]":
"""Used to ratelimit an incoming request from a given host

Expand All @@ -59,7 +95,7 @@ def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]
Returns:
context manager which returns a deferred.
"""
return self.ratelimiters[host].ratelimit()
return self.ratelimiters[host].ratelimit(host)


class _PerHostRatelimiter:
Expand Down Expand Up @@ -94,19 +130,36 @@ def __init__(self, clock: Clock, config: FederationRatelimitSettings):
self.request_times: List[int] = []

@contextlib.contextmanager
def ratelimit(self) -> "Iterator[defer.Deferred[None]]":
def ratelimit(self, host: str) -> "Iterator[defer.Deferred[None]]":
# `contextlib.contextmanager` takes a generator and turns it into a
# context manager. The generator should only yield once with a value
# to be returned by manager.
# Exceptions will be reraised at the yield.

self.host = host

request_id = object()
ret = self._on_enter(request_id)
try:
yield ret
finally:
self._on_exit(request_id)

def should_reject(self) -> bool:
"""
Whether to reject the request if we already have too many queued up
(either sleeping or in the ready queue).
"""
queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
return queue_size > self.reject_limit

def should_sleep(self) -> bool:
"""
Whether to sleep the request if we already have too many requests coming
through within the window.
"""
return len(self.request_times) > self.sleep_limit

def _on_enter(self, request_id: object) -> "defer.Deferred[None]":
time_now = self.clock.time_msec()

Expand All @@ -117,8 +170,9 @@ def _on_enter(self, request_id: object) -> "defer.Deferred[None]":

# reject the request if we already have too many queued up (either
# sleeping or in the ready queue).
queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
if queue_size > self.reject_limit:
if self.should_reject():
logger.debug("Ratelimiter(%s): rejecting request", self.host)
rate_limit_reject_counter.inc()
raise LimitExceededError(
retry_after_ms=int(self.window_size / self.sleep_limit)
)
Expand All @@ -130,7 +184,8 @@ def queue_request() -> "defer.Deferred[None]":
queue_defer: defer.Deferred[None] = defer.Deferred()
self.ready_request_queue[request_id] = queue_defer
logger.info(
"Ratelimiter: queueing request (queue now %i items)",
"Ratelimiter(%s): queueing request (queue now %i items)",
self.host,
len(self.ready_request_queue),
)

Expand All @@ -139,19 +194,28 @@ def queue_request() -> "defer.Deferred[None]":
return defer.succeed(None)

logger.debug(
"Ratelimit [%s]: len(self.request_times)=%d",
"Ratelimit(%s) [%s]: len(self.request_times)=%d",
self.host,
id(request_id),
len(self.request_times),
)

if len(self.request_times) > self.sleep_limit:
logger.debug("Ratelimiter: sleeping request for %f sec", self.sleep_sec)
if self.should_sleep():
logger.debug(
"Ratelimiter(%s) [%s]: sleeping request for %f sec",
self.host,
id(request_id),
self.sleep_sec,
)
rate_limit_sleep_counter.inc()
ret_defer = run_in_background(self.clock.sleep, self.sleep_sec)

self.sleeping_requests.add(request_id)

def on_wait_finished(_: Any) -> "defer.Deferred[None]":
logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id))
logger.debug(
"Ratelimit(%s) [%s]: Finished sleeping", self.host, id(request_id)
)
self.sleeping_requests.discard(request_id)
queue_defer = queue_request()
return queue_defer
Expand All @@ -161,7 +225,9 @@ def on_wait_finished(_: Any) -> "defer.Deferred[None]":
ret_defer = queue_request()

def on_start(r: object) -> object:
logger.debug("Ratelimit [%s]: Processing req", id(request_id))
logger.debug(
"Ratelimit(%s) [%s]: Processing req", self.host, id(request_id)
)
self.current_processing.add(request_id)
return r

Expand All @@ -183,7 +249,7 @@ def on_both(r: object) -> object:
return make_deferred_yieldable(ret_defer)

def _on_exit(self, request_id: object) -> None:
logger.debug("Ratelimit [%s]: Processed req", id(request_id))
logger.debug("Ratelimit(%s) [%s]: Processed req", self.host, id(request_id))
self.current_processing.discard(request_id)
try:
# start processing the next item on the queue.
Expand Down