Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Support enabling/disabling pushers (from MSC3881) (#13799)
Browse files Browse the repository at this point in the history
Partial implementation of MSC3881
  • Loading branch information
babolivier authored Sep 21, 2022
1 parent 6bd8763 commit 8ae42ab
Show file tree
Hide file tree
Showing 15 changed files with 294 additions and 71 deletions.
1 change: 1 addition & 0 deletions changelog.d/13799.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for [MSC3881: Remotely toggle push notifications for another client](https://github.com/matrix-org/matrix-spec-proposals/pull/3881).
1 change: 1 addition & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
"e2e_fallback_keys_json": ["used"],
"access_tokens": ["used"],
"device_lists_changes_in_room": ["converted_to_destinations"],
"pushers": ["enabled"],
}


Expand Down
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:

# MSC3852: Expose last seen user agent field on /_matrix/client/v3/devices.
self.msc3852_enabled: bool = experimental.get("msc3852_enabled", False)

# MSC3881: Remotely toggle push notifications for another client
self.msc3881_enabled: bool = experimental.get("msc3881_enabled", False)
4 changes: 2 additions & 2 deletions synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -997,15 +997,15 @@ async def _register_email_threepid(
assert user_tuple
token_id = user_tuple.token_id

await self.pusher_pool.add_pusher(
await self.pusher_pool.add_or_update_pusher(
user_id=user_id,
access_token=token_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
device_display_name=threepid["address"],
pushkey=threepid["address"],
lang=None, # We don't know a user's language here
lang=None,
data={},
)

Expand Down
2 changes: 2 additions & 0 deletions synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class PusherConfig:
last_stream_ordering: int
last_success: Optional[int]
failing_since: Optional[int]
enabled: bool

def as_dict(self) -> Dict[str, Any]:
"""Information that can be retrieved about a pusher after creation."""
Expand All @@ -128,6 +129,7 @@ def as_dict(self) -> Dict[str, Any]:
"lang": self.lang,
"profile_tag": self.profile_tag,
"pushkey": self.pushkey,
"enabled": self.enabled,
}


Expand Down
81 changes: 61 additions & 20 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def start(self) -> None:
return
run_as_background_process("start_pushers", self._start_pushers)

async def add_pusher(
async def add_or_update_pusher(
self,
user_id: str,
access_token: Optional[int],
Expand All @@ -106,6 +106,7 @@ async def add_pusher(
lang: Optional[str],
data: JsonDict,
profile_tag: str = "",
enabled: bool = True,
) -> Optional[Pusher]:
"""Creates a new pusher and adds it to the pool
Expand Down Expand Up @@ -147,9 +148,20 @@ async def add_pusher(
last_stream_ordering=last_stream_ordering,
last_success=None,
failing_since=None,
enabled=enabled,
)
)

# Before we actually persist the pusher, we check if the user already has one
# for this app ID and pushkey. If so, we want to keep the access token in place,
# since this could be one device modifying (e.g. enabling/disabling) another
# device's pusher.
existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
user_id, app_id, pushkey
)
if existing_config:
access_token = existing_config.access_token

await self.store.add_pusher(
user_id=user_id,
access_token=access_token,
Expand All @@ -163,8 +175,9 @@ async def add_pusher(
data=data,
last_stream_ordering=last_stream_ordering,
profile_tag=profile_tag,
enabled=enabled,
)
pusher = await self.start_pusher_by_id(app_id, pushkey, user_id)
pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id)

return pusher

Expand Down Expand Up @@ -276,10 +289,25 @@ async def on_new_receipts(
except Exception:
logger.exception("Exception in pusher on_new_receipts")

async def start_pusher_by_id(
async def _get_pusher_config_for_user_by_app_id_and_pushkey(
self, user_id: str, app_id: str, pushkey: str
) -> Optional[PusherConfig]:
resultlist = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)

pusher_config = None
for r in resultlist:
if r.user_name == user_id:
pusher_config = r

return pusher_config

async def process_pusher_change_by_id(
self, app_id: str, pushkey: str, user_id: str
) -> Optional[Pusher]:
"""Look up the details for the given pusher, and start it
"""Look up the details for the given pusher, and either start it if its
"enabled" flag is True, or try to stop it otherwise.
If the pusher is new and its "enabled" flag is False, the stop is a noop.
Returns:
The pusher started, if any
Expand All @@ -290,12 +318,13 @@ async def start_pusher_by_id(
if not self._pusher_shard_config.should_handle(self._instance_name, user_id):
return None

resultlist = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
pusher_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
user_id, app_id, pushkey
)

pusher_config = None
for r in resultlist:
if r.user_name == user_id:
pusher_config = r
if pusher_config and not pusher_config.enabled:
self.maybe_stop_pusher(app_id, pushkey, user_id)
return None

pusher = None
if pusher_config:
Expand All @@ -305,7 +334,7 @@ async def start_pusher_by_id(

async def _start_pushers(self) -> None:
"""Start all the pushers"""
pushers = await self.store.get_all_pushers()
pushers = await self.store.get_enabled_pushers()

# Stagger starting up the pushers so we don't completely drown the
# process on start up.
Expand Down Expand Up @@ -363,6 +392,8 @@ async def _start_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]:

synapse_pushers.labels(type(pusher).__name__, pusher.app_id).inc()

logger.info("Starting pusher %s / %s", pusher.user_id, appid_pushkey)

# Check if there *may* be push to process. We do this as this check is a
# lot cheaper to do than actually fetching the exact rows we need to
# push.
Expand All @@ -382,16 +413,7 @@ async def _start_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]:
return pusher

async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
appid_pushkey = "%s:%s" % (app_id, pushkey)

byuser = self.pushers.get(user_id, {})

if appid_pushkey in byuser:
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
pusher = byuser.pop(appid_pushkey)
pusher.on_stop()

synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
self.maybe_stop_pusher(app_id, pushkey, user_id)

# We can only delete pushers on master.
if self._remove_pusher_client:
Expand All @@ -402,3 +424,22 @@ async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
await self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id, pushkey, user_id
)

def maybe_stop_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
"""Stops a pusher with the given app ID and push key if one is running.
Args:
app_id: the pusher's app ID.
pushkey: the pusher's push key.
user_id: the user the pusher belongs to. Only used for logging.
"""
appid_pushkey = "%s:%s" % (app_id, pushkey)

byuser = self.pushers.get(user_id, {})

if appid_pushkey in byuser:
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
pusher = byuser.pop(appid_pushkey)
pusher.on_stop()

synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
10 changes: 7 additions & 3 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ async def on_rdata(
if row.deleted:
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
else:
await self.start_pusher(row.user_id, row.app_id, row.pushkey)
await self.process_pusher_change(
row.user_id, row.app_id, row.pushkey
)
elif stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
Expand Down Expand Up @@ -334,13 +336,15 @@ def stop_pusher(self, user_id: str, app_id: str, pushkey: str) -> None:
logger.info("Stopping pusher %r / %r", user_id, key)
pusher.on_stop()

async def start_pusher(self, user_id: str, app_id: str, pushkey: str) -> None:
async def process_pusher_change(
self, user_id: str, app_id: str, pushkey: str
) -> None:
if not self._notify_pushers:
return

key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
await self._pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
await self._pusher_pool.process_pusher_change_by_id(app_id, pushkey, user_id)


class FederationSenderHandler:
Expand Down
4 changes: 2 additions & 2 deletions synapse/rest/admin/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,15 +375,15 @@ async def on_PUT(
and self.hs.config.email.email_notif_for_new_users
and medium == "email"
):
await self.pusher_pool.add_pusher(
await self.pusher_pool.add_or_update_pusher(
user_id=user_id,
access_token=None,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
device_display_name=address,
pushkey=address,
lang=None, # We don't know a user's language here
lang=None,
data={},
)

Expand Down
18 changes: 15 additions & 3 deletions synapse/rest/client/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__()
self.hs = hs
self.auth = hs.get_auth()
self._msc3881_enabled = self.hs.config.experimental.msc3881_enabled

async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
Expand All @@ -51,9 +52,14 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
user.to_string()
)

filtered_pushers = [p.as_dict() for p in pushers]
pusher_dicts = [p.as_dict() for p in pushers]

return 200, {"pushers": filtered_pushers}
for pusher in pusher_dicts:
if self._msc3881_enabled:
pusher["org.matrix.msc3881.enabled"] = pusher["enabled"]
del pusher["enabled"]

return 200, {"pushers": pusher_dicts}


class PushersSetRestServlet(RestServlet):
Expand All @@ -65,6 +71,7 @@ def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
self.notifier = hs.get_notifier()
self.pusher_pool = self.hs.get_pusherpool()
self._msc3881_enabled = self.hs.config.experimental.msc3881_enabled

async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
Expand Down Expand Up @@ -103,6 +110,10 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
if "append" in content:
append = content["append"]

enabled = True
if self._msc3881_enabled and "org.matrix.msc3881.enabled" in content:
enabled = content["org.matrix.msc3881.enabled"]

if not append:
await self.pusher_pool.remove_pushers_by_app_id_and_pushkey_not_user(
app_id=content["app_id"],
Expand All @@ -111,7 +122,7 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
)

try:
await self.pusher_pool.add_pusher(
await self.pusher_pool.add_or_update_pusher(
user_id=user.to_string(),
access_token=requester.access_token_id,
kind=content["kind"],
Expand All @@ -122,6 +133,7 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
lang=content["lang"],
data=content["data"],
profile_tag=content.get("profile_tag", ""),
enabled=enabled,
)
except PusherConfigException as pce:
raise SynapseError(
Expand Down
Loading

0 comments on commit 8ae42ab

Please sign in to comment.