Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Convert throttle params into attrs.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Dec 14, 2020
1 parent e224f5e commit c00eba0
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 17 deletions.
6 changes: 6 additions & 0 deletions synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ def as_get(self) -> Dict[str, Any]:
}


@attr.s(slots=True)
class ThrottleParams:
last_sent_ts = attr.ib(type=int)
throttle_ms = attr.ib(type=int)


class Pusher(metaclass=abc.ABCMeta):
def __init__(self, hs: "HomeServer", pusher_config: PusherConfig):
self.hs = hs
Expand Down
15 changes: 7 additions & 8 deletions synapse/push/emailpusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from twisted.internet.error import AlreadyCalled, AlreadyCancelled

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher, PusherConfig
from synapse.push import Pusher, PusherConfig, ThrottleParams
from synapse.push.mailer import Mailer
from synapse.types import RoomStreamToken

Expand Down Expand Up @@ -68,7 +68,7 @@ def __init__(self, hs: "HomeServer", pusher_config: PusherConfig, mailer: Mailer
self.store = self.hs.get_datastore()
self.email = pusher_config.pushkey
self.timed_call = None # type: Optional[DelayedCall]
self.throttle_params = {} # type: Dict[str, Dict[str, int]]
self.throttle_params = {} # type: Dict[str, ThrottleParams]
self._inited = False

self._is_processing = False
Expand Down Expand Up @@ -261,13 +261,13 @@ def seconds_until(self, ts_msec: int) -> float:

def get_room_throttle_ms(self, room_id: str) -> int:
if room_id in self.throttle_params:
return self.throttle_params[room_id]["throttle_ms"]
return self.throttle_params[room_id].throttle_ms
else:
return 0

def get_room_last_sent_ts(self, room_id: str) -> int:
if room_id in self.throttle_params:
return self.throttle_params[room_id]["last_sent_ts"]
return self.throttle_params[room_id].last_sent_ts
else:
return 0

Expand Down Expand Up @@ -318,10 +318,9 @@ async def sent_notif_update_throttle(
new_throttle_ms = min(
current_throttle_ms * THROTTLE_MULTIPLIER, THROTTLE_MAX_MS
)
self.throttle_params[room_id] = {
"last_sent_ts": self.clock.time_msec(),
"throttle_ms": new_throttle_ms,
}
self.throttle_params[room_id] = ThrottleParams(
self.clock.time_msec(), new_throttle_ms,
)
assert self.pusher_id is not None
await self.store.set_throttle_params(
self.pusher_id, room_id, self.throttle_params[room_id]
Expand Down
2 changes: 1 addition & 1 deletion synapse/push/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional
from typing import TYPE_CHECKING, Callable, Dict, Optional

from synapse.push import Pusher, PusherConfig
from synapse.push.emailpusher import EmailPusher
Expand Down
15 changes: 7 additions & 8 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from canonicaljson import encode_canonical_json

from synapse.push import PusherConfig
from synapse.push import PusherConfig, ThrottleParams
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.types import Connection
Expand Down Expand Up @@ -253,7 +253,7 @@ async def update_pusher_failing_since(

async def get_throttle_params_by_room(
self, pusher_id: str
) -> Dict[str, Dict[str, int]]:
) -> Dict[str, ThrottleParams]:
res = await self.db_pool.simple_select_list(
"pusher_throttle",
{"pusher": pusher_id},
Expand All @@ -263,22 +263,21 @@ async def get_throttle_params_by_room(

params_by_room = {}
for row in res:
params_by_room[row["room_id"]] = {
"last_sent_ts": row["last_sent_ts"],
"throttle_ms": row["throttle_ms"],
}
params_by_room[row["room_id"]] = ThrottleParams(
row["last_sent_ts"], row["throttle_ms"],
)

return params_by_room

async def set_throttle_params(
self, pusher_id: str, room_id: str, params: Dict[str, Any]
self, pusher_id: str, room_id: str, params: ThrottleParams
) -> None:
# no need to lock because `pusher_throttle` has a primary key on
# (pusher, room_id) so simple_upsert will retry
await self.db_pool.simple_upsert(
"pusher_throttle",
{"pusher": pusher_id, "room_id": room_id},
params,
{"last_sent_ts": params.last_sent_ts, "throttle_ms": params.throttle_ms},
desc="set_throttle_params",
lock=False,
)
Expand Down

0 comments on commit c00eba0

Please sign in to comment.