Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Convert internal pusher dicts to attrs classes. (#8940)
Browse files Browse the repository at this point in the history
This improves type hinting and should use less memory.
  • Loading branch information
clokep authored Dec 16, 2020
1 parent 7a33285 commit bd30cfe
Show file tree
Hide file tree
Showing 17 changed files with 266 additions and 204 deletions.
1 change: 1 addition & 0 deletions changelog.d/8940.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add type hints to push module.
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ files =
synapse/state,
synapse/storage/databases/main/appservice.py,
synapse/storage/databases/main/events.py,
synapse/storage/databases/main/pusher.py,
synapse/storage/databases/main/registration.py,
synapse/storage/databases/main/stream.py,
synapse/storage/databases/main/ui_auth.py,
Expand Down
60 changes: 53 additions & 7 deletions synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,70 @@
# limitations under the License.

import abc
from typing import TYPE_CHECKING, Any, Dict
from typing import TYPE_CHECKING, Any, Dict, Optional

from synapse.types import RoomStreamToken
import attr

from synapse.types import JsonDict, RoomStreamToken

if TYPE_CHECKING:
from synapse.app.homeserver import HomeServer


@attr.s(slots=True)
class PusherConfig:
"""Parameters necessary to configure a pusher."""

id = attr.ib(type=Optional[str])
user_name = attr.ib(type=str)
access_token = attr.ib(type=Optional[int])
profile_tag = attr.ib(type=str)
kind = attr.ib(type=str)
app_id = attr.ib(type=str)
app_display_name = attr.ib(type=str)
device_display_name = attr.ib(type=str)
pushkey = attr.ib(type=str)
ts = attr.ib(type=int)
lang = attr.ib(type=Optional[str])
data = attr.ib(type=Optional[JsonDict])
last_stream_ordering = attr.ib(type=Optional[int])
last_success = attr.ib(type=Optional[int])
failing_since = attr.ib(type=Optional[int])

def as_dict(self) -> Dict[str, Any]:
"""Information that can be retrieved about a pusher after creation."""
return {
"app_display_name": self.app_display_name,
"app_id": self.app_id,
"data": self.data,
"device_display_name": self.device_display_name,
"kind": self.kind,
"lang": self.lang,
"profile_tag": self.profile_tag,
"pushkey": self.pushkey,
}


@attr.s(slots=True)
class ThrottleParams:
"""Parameters for controlling the rate of sending pushes via email."""

last_sent_ts = attr.ib(type=int)
throttle_ms = attr.ib(type=int)


class Pusher(metaclass=abc.ABCMeta):
def __init__(self, hs: "HomeServer", pusherdict: Dict[str, Any]):
def __init__(self, hs: "HomeServer", pusher_config: PusherConfig):
self.hs = hs
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()

self.pusher_id = pusherdict["id"]
self.user_id = pusherdict["user_name"]
self.app_id = pusherdict["app_id"]
self.pushkey = pusherdict["pushkey"]
self.pusher_id = pusher_config.id
self.user_id = pusher_config.user_name
self.app_id = pusher_config.app_id
self.pushkey = pusher_config.pushkey

self.last_stream_ordering = pusher_config.last_stream_ordering

# This is the highest stream ordering we know it's safe to process.
# When new events arrive, we'll be given a window of new events: we
Expand Down
27 changes: 14 additions & 13 deletions synapse/push/emailpusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional

from twisted.internet.base import DelayedCall
from twisted.internet.error import AlreadyCalled, AlreadyCancelled

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher
from synapse.push import Pusher, PusherConfig, ThrottleParams
from synapse.push.mailer import Mailer

if TYPE_CHECKING:
Expand Down Expand Up @@ -60,15 +60,14 @@ class EmailPusher(Pusher):
factor out the common parts
"""

def __init__(self, hs: "HomeServer", pusherdict: Dict[str, Any], mailer: Mailer):
super().__init__(hs, pusherdict)
def __init__(self, hs: "HomeServer", pusher_config: PusherConfig, mailer: Mailer):
super().__init__(hs, pusher_config)
self.mailer = mailer

self.store = self.hs.get_datastore()
self.email = pusherdict["pushkey"]
self.last_stream_ordering = pusherdict["last_stream_ordering"]
self.email = pusher_config.pushkey
self.timed_call = None # type: Optional[DelayedCall]
self.throttle_params = {} # type: Dict[str, Dict[str, int]]
self.throttle_params = {} # type: Dict[str, ThrottleParams]
self._inited = False

self._is_processing = False
Expand Down Expand Up @@ -132,6 +131,7 @@ async def _process(self) -> None:

if not self._inited:
# this is our first loop: load up the throttle params
assert self.pusher_id is not None
self.throttle_params = await self.store.get_throttle_params_by_room(
self.pusher_id
)
Expand All @@ -157,6 +157,7 @@ async def _unsafe_process(self) -> None:
being run.
"""
start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
assert start is not None
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email(
self.user_id, start, self.max_stream_ordering
)
Expand Down Expand Up @@ -244,13 +245,13 @@ def seconds_until(self, ts_msec: int) -> float:

def get_room_throttle_ms(self, room_id: str) -> int:
if room_id in self.throttle_params:
return self.throttle_params[room_id]["throttle_ms"]
return self.throttle_params[room_id].throttle_ms
else:
return 0

def get_room_last_sent_ts(self, room_id: str) -> int:
if room_id in self.throttle_params:
return self.throttle_params[room_id]["last_sent_ts"]
return self.throttle_params[room_id].last_sent_ts
else:
return 0

Expand Down Expand Up @@ -301,10 +302,10 @@ async def sent_notif_update_throttle(
new_throttle_ms = min(
current_throttle_ms * THROTTLE_MULTIPLIER, THROTTLE_MAX_MS
)
self.throttle_params[room_id] = {
"last_sent_ts": self.clock.time_msec(),
"throttle_ms": new_throttle_ms,
}
self.throttle_params[room_id] = ThrottleParams(
self.clock.time_msec(), new_throttle_ms,
)
assert self.pusher_id is not None
await self.store.set_throttle_params(
self.pusher_id, room_id, self.throttle_params[room_id]
)
Expand Down
36 changes: 18 additions & 18 deletions synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from synapse.events import EventBase
from synapse.logging import opentracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher, PusherConfigException
from synapse.push import Pusher, PusherConfig, PusherConfigException

from . import push_rule_evaluator, push_tools

Expand Down Expand Up @@ -62,33 +62,29 @@ class HttpPusher(Pusher):
# This one's in ms because we compare it against the clock
GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000

def __init__(self, hs: "HomeServer", pusherdict: Dict[str, Any]):
super().__init__(hs, pusherdict)
def __init__(self, hs: "HomeServer", pusher_config: PusherConfig):
super().__init__(hs, pusher_config)
self.storage = self.hs.get_storage()
self.app_display_name = pusherdict["app_display_name"]
self.device_display_name = pusherdict["device_display_name"]
self.pushkey_ts = pusherdict["ts"]
self.data = pusherdict["data"]
self.last_stream_ordering = pusherdict["last_stream_ordering"]
self.app_display_name = pusher_config.app_display_name
self.device_display_name = pusher_config.device_display_name
self.pushkey_ts = pusher_config.ts
self.data = pusher_config.data
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.failing_since = pusherdict["failing_since"]
self.failing_since = pusher_config.failing_since
self.timed_call = None
self._is_processing = False
self._group_unread_count_by_room = hs.config.push_group_unread_count_by_room

if "data" not in pusherdict:
raise PusherConfigException("No 'data' key for HTTP pusher")
self.data = pusherdict["data"]
self.data = pusher_config.data
if self.data is None:
raise PusherConfigException("'data' key can not be null for HTTP pusher")

self.name = "%s/%s/%s" % (
pusherdict["user_name"],
pusherdict["app_id"],
pusherdict["pushkey"],
pusher_config.user_name,
pusher_config.app_id,
pusher_config.pushkey,
)

if self.data is None:
raise PusherConfigException("data can not be null for HTTP pusher")

# Validate that there's a URL and it is of the proper form.
if "url" not in self.data:
raise PusherConfigException("'url' required in data for HTTP pusher")
Expand Down Expand Up @@ -180,6 +176,7 @@ async def _unsafe_process(self) -> None:
Never call this directly: use _process which will only allow this to
run once per pusher.
"""
assert self.last_stream_ordering is not None
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http(
self.user_id, self.last_stream_ordering, self.max_stream_ordering
)
Expand Down Expand Up @@ -208,6 +205,7 @@ async def _unsafe_process(self) -> None:
http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"]
assert self.last_stream_ordering is not None
pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.pushkey,
Expand Down Expand Up @@ -314,6 +312,8 @@ async def _build_notification_dict(
# or may do so (i.e. is encrypted so has unknown effects).
priority = "high"

# This was checked in the __init__, but mypy doesn't seem to know that.
assert self.data is not None
if self.data.get("format") == "event_id_only":
d = {
"notification": {
Expand Down
24 changes: 12 additions & 12 deletions synapse/push/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional
from typing import TYPE_CHECKING, Callable, Dict, Optional

from synapse.push import Pusher
from synapse.push import Pusher, PusherConfig
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
from synapse.push.mailer import Mailer
Expand All @@ -34,7 +34,7 @@ def __init__(self, hs: "HomeServer"):

self.pusher_types = {
"http": HttpPusher
} # type: Dict[str, Callable[[HomeServer, dict], Pusher]]
} # type: Dict[str, Callable[[HomeServer, PusherConfig], Pusher]]

logger.info("email enable notifs: %r", hs.config.email_enable_notifs)
if hs.config.email_enable_notifs:
Expand All @@ -47,18 +47,18 @@ def __init__(self, hs: "HomeServer"):

logger.info("defined email pusher type")

def create_pusher(self, pusherdict: Dict[str, Any]) -> Optional[Pusher]:
kind = pusherdict["kind"]
def create_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]:
kind = pusher_config.kind
f = self.pusher_types.get(kind, None)
if not f:
return None
logger.debug("creating %s pusher for %r", kind, pusherdict)
return f(self.hs, pusherdict)
logger.debug("creating %s pusher for %r", kind, pusher_config)
return f(self.hs, pusher_config)

def _create_email_pusher(
self, _hs: "HomeServer", pusherdict: Dict[str, Any]
self, _hs: "HomeServer", pusher_config: PusherConfig
) -> EmailPusher:
app_name = self._app_name_from_pusherdict(pusherdict)
app_name = self._app_name_from_pusherdict(pusher_config)
mailer = self.mailers.get(app_name)
if not mailer:
mailer = Mailer(
Expand All @@ -68,10 +68,10 @@ def _create_email_pusher(
template_text=self._notif_template_text,
)
self.mailers[app_name] = mailer
return EmailPusher(self.hs, pusherdict, mailer)
return EmailPusher(self.hs, pusher_config, mailer)

def _app_name_from_pusherdict(self, pusherdict: Dict[str, Any]) -> str:
data = pusherdict["data"]
def _app_name_from_pusherdict(self, pusher_config: PusherConfig) -> str:
data = pusher_config.data

if isinstance(data, dict):
brand = data.get("brand")
Expand Down
Loading

0 comments on commit bd30cfe

Please sign in to comment.