Skip to content

Commit

Permalink
Define delayed event ratelimit category
Browse files Browse the repository at this point in the history
Apply ratelimiting on delayed event management separately from messages.
  • Loading branch information
AndrewFerr committed Dec 9, 2024
1 parent adce8a0 commit f3688b5
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 3 deletions.
3 changes: 3 additions & 0 deletions demo/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ for port in 8080 8081 8082; do
per_user:
per_second: 1000
burst_count: 1000
rc_delayed_event:
per_second: 1000
burst_count: 1000
RC
)
echo "${ratelimiting}" >> "$port.config"
Expand Down
4 changes: 4 additions & 0 deletions docker/complement/conf/workers-shared-extra.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ rc_invites:
per_second: 1000
burst_count: 1000

rc_delayed_event:
per_second: 9999
burst_count: 9999

federation_rr_transactions_per_room_per_second: 9999

allow_device_name_lookup_over_federation: true
Expand Down
19 changes: 19 additions & 0 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,25 @@ rc_federation:
concurrent: 5
```
---
### `rc_delayed_event`


Ratelimiting settings for delayed event management.

This is a ratelimiting option that ratelimits
attempts to restart, cancel, or view delayed events
based on the account the client is using.
It defaults to: `per_second: 10`, `burst_count: 100`.

Attempts to create or send delayed events are ratelimited not by this setting, but by `rc_message`.

Example configuration:
```yaml
rc_delayed_event:
per_second: 5
burst_count: 50
```
---
### `federation_rr_transactions_per_room_per_second`

Sets outgoing federation transaction frequency for sending read-receipts,
Expand Down
6 changes: 6 additions & 0 deletions synapse/config/ratelimiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,9 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
config.get("remote_media_download_burst_count", "500M")
),
)

self.rc_delayed_event = RatelimitSettings.parse(
config,
"rc_delayed_event",
defaults={"per_second": 10, "burst_count": 100},
)
11 changes: 8 additions & 3 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, hs: "HomeServer"):
self._config = hs.config
self._clock = hs.get_clock()
self._request_ratelimiter = hs.get_request_ratelimiter()
self._delayed_event_ratelimiter = hs.get_delayed_event_ratelimiter()
self._event_creation_handler = hs.get_event_creation_handler()
self._room_member_handler = hs.get_room_member_handler()

Expand Down Expand Up @@ -227,6 +228,8 @@ async def add(
Raises:
SynapseError: if the delayed event fails validation checks.
"""
# Use standard request limiter for scheduling new delayed events.
# TODO: Instead apply rateliming based on the scheduled send time.
await self._request_ratelimiter.ratelimit(requester)

self._event_creation_handler.validator.validate_builder(
Expand Down Expand Up @@ -285,7 +288,7 @@ async def cancel(self, requester: Requester, delay_id: str) -> None:
NotFoundError: if no matching delayed event could be found.
"""
assert self._is_master
await self._request_ratelimiter.ratelimit(requester)
await self._delayed_event_ratelimiter.ratelimit(requester)
await self._initialized_from_db

next_send_ts = await self._store.cancel_delayed_event(
Expand All @@ -308,7 +311,7 @@ async def restart(self, requester: Requester, delay_id: str) -> None:
NotFoundError: if no matching delayed event could be found.
"""
assert self._is_master
await self._request_ratelimiter.ratelimit(requester)
await self._delayed_event_ratelimiter.ratelimit(requester)
await self._initialized_from_db

next_send_ts = await self._store.restart_delayed_event(
Expand All @@ -332,6 +335,8 @@ async def send(self, requester: Requester, delay_id: str) -> None:
NotFoundError: if no matching delayed event could be found.
"""
assert self._is_master
# Use standard request limiter for sending delayed events on-demand,
# as an on-demand send is similar to sending a regular event.
await self._request_ratelimiter.ratelimit(requester)
await self._initialized_from_db

Expand Down Expand Up @@ -415,7 +420,7 @@ def _schedule_next_at(self, next_send_ts: Timestamp) -> None:

async def get_all_for_user(self, requester: Requester) -> List[JsonDict]:
"""Return all pending delayed events requested by the given user."""
await self._request_ratelimiter.ratelimit(requester)
await self._delayed_event_ratelimiter.ratelimit(requester)
return await self._store.get_all_delayed_events_for_user(
requester.user.localpart
)
Expand Down
8 changes: 8 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,14 @@ def get_request_ratelimiter(self) -> RequestRatelimiter:
self.config.ratelimiting.rc_admin_redaction,
)

@cache_in_self
def get_delayed_event_ratelimiter(self) -> Ratelimiter:
return Ratelimiter(
store=self.get_datastores().main,
clock=self.get_clock(),
cfg=self.config.ratelimiting.rc_delayed_event,
)

@cache_in_self
def get_common_usage_metrics_manager(self) -> CommonUsageMetricsManager:
"""Usage metrics shared between phone home stats and the prometheus exporter."""
Expand Down
138 changes: 138 additions & 0 deletions tests/rest/client/test_delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,25 @@ def test_delayed_state_events_are_sent_on_timeout(self) -> None:
)
self.assertEqual(setter_expected, content.get(setter_key), content)

@unittest.override_config({"rc_delayed_event": {"per_second": 0.5, "burst_count": 1}})
def test_get_delayed_events_ratelimit(self) -> None:
args = ("GET", PATH_PREFIX)

channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

channel = self.make_request(*args)
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)

# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
self.get_success(
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
)

# Test that the request isn't ratelimited anymore.
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

def test_update_delayed_event_without_id(self) -> None:
channel = self.make_request(
"POST",
Expand Down Expand Up @@ -206,6 +225,44 @@ def test_cancel_delayed_state_event(self) -> None:
expect_code=HTTPStatus.NOT_FOUND,
)

@unittest.override_config({"rc_delayed_event": {"per_second": 0.5, "burst_count": 1}})
def test_cancel_delayed_event_ratelimit(self) -> None:
delay_ids = []
for i in range(2):
channel = self.make_request(
"POST",
_get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
{},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
delay_id = channel.json_body.get("delay_id")
self.assertIsNotNone(delay_id)
delay_ids.append(delay_id)

channel = self.make_request(
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "cancel"},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

args = (
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "cancel"},
)
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)

# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
self.get_success(
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
)

# Test that the request isn't ratelimited anymore.
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

def test_send_delayed_state_event(self) -> None:
state_key = "to_send_on_request"

Expand Down Expand Up @@ -250,6 +307,44 @@ def test_send_delayed_state_event(self) -> None:
)
self.assertEqual(setter_expected, content.get(setter_key), content)

@unittest.override_config({"rc_message": {"per_second": 3, "burst_count": 4}})
def test_send_delayed_event_ratelimit(self) -> None:
delay_ids = []
for i in range(2):
channel = self.make_request(
"POST",
_get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
{},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
delay_id = channel.json_body.get("delay_id")
self.assertIsNotNone(delay_id)
delay_ids.append(delay_id)

channel = self.make_request(
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "send"},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

args = (
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "send"},
)
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)

# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
self.get_success(
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
)

# Test that the request isn't ratelimited anymore.
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

def test_restart_delayed_state_event(self) -> None:
state_key = "to_send_on_restarted_timeout"

Expand Down Expand Up @@ -309,6 +404,44 @@ def test_restart_delayed_state_event(self) -> None:
)
self.assertEqual(setter_expected, content.get(setter_key), content)

@unittest.override_config({"rc_delayed_event": {"per_second": 0.5, "burst_count": 1}})
def test_restart_delayed_event_ratelimit(self) -> None:
delay_ids = []
for i in range(2):
channel = self.make_request(
"POST",
_get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
{},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
delay_id = channel.json_body.get("delay_id")
self.assertIsNotNone(delay_id)
delay_ids.append(delay_id)

channel = self.make_request(
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "restart"},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

args = (
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "restart"},
)
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)

# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
self.get_success(
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
)

# Test that the request isn't ratelimited anymore.
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

def test_delayed_state_events_are_cancelled_by_more_recent_state(self) -> None:
state_key = "to_be_cancelled"

Expand Down Expand Up @@ -374,3 +507,8 @@ def _get_path_for_delayed_state(
room_id: str, event_type: str, state_key: str, delay_ms: int
) -> str:
return f"rooms/{room_id}/state/{event_type}/{state_key}?org.matrix.msc4140.delay={delay_ms}"

def _get_path_for_delayed_send(
room_id: str, event_type: str, delay_ms: int
) -> str:
return f"rooms/{room_id}/send/{event_type}?org.matrix.msc4140.delay={delay_ms}"
34 changes: 34 additions & 0 deletions tests/rest/client/test_rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -2382,6 +2382,40 @@ def test_send_delayed_state_event(self) -> None:
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

@unittest.override_config({
"max_event_delay_duration": "24h",
"rc_message": {"per_second": 1, "burst_count": 2},
})
def test_add_delayed_event_ratelimit(self) -> None:
"""Test that requests to schedule new delayed events are ratelimited by a RateLimiter,
which ratelimits them correctly, including by not limiting when the requester is
exempt from ratelimiting.
"""

# Test that new delayed events are correctly ratelimited.
args = (
"POST",
(
"rooms/%s/send/m.room.message?org.matrix.msc4140.delay=2000"
% self.room_id
).encode("ascii"),
{"body": "test", "msgtype": "m.text"},
)
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)

# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
self.get_success(
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
)

# Test that the new delayed events aren't ratelimited anymore.
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)



class RoomSearchTestCase(unittest.HomeserverTestCase):
servlets = [
Expand Down

0 comments on commit f3688b5

Please sign in to comment.