Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Wire up SendToDeviceRestServlet to work on workers
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Jan 7, 2021
1 parent eb6121a commit 7d43cb7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 8 deletions.
3 changes: 3 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
)
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.v2_alpha.sendtodevice import SendToDeviceRestServlet
from synapse.rest.client.versions import VersionsRestServlet
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyApiV2Resource
Expand Down Expand Up @@ -520,6 +521,8 @@ def _listen_http(self, listener_config: ListenerConfig):
room.register_deprecated_servlets(self, resource)
InitialSyncRestServlet(self).register(resource)

SendToDeviceRestServlet(self).register(resource)

user_directory.register_servlets(self, resource)

# If presence is disabled, use the stub servlet that does
Expand Down
29 changes: 21 additions & 8 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,23 @@ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self.is_mine = hs.is_mine
self.federation = hs.get_federation_sender()

hs.get_federation_registry().register_edu_handler(
"m.direct_to_device", self.on_direct_to_device_edu
)
# We only need to poke the federation sender explicitly if its on the
# same instance.
self.federation_sender = None
if hs.should_send_federation():
self.federation_sender = hs.get_federation_sender()

# If we can handle the to device EDUs we do so, otherwise we route them
# to the appropriate worker.
if hs.get_instance_name() in hs.config.worker.writers.to_device:
hs.get_federation_registry().register_edu_handler(
"m.direct_to_device", self.on_direct_to_device_edu
)
else:
hs.get_federation_registry().register_instances_for_edu(
"m.direct_to_device", hs.config.worker.writers.to_device,
)

if hs.config.worker.worker_app is None:
self._user_device_resync = (
Expand Down Expand Up @@ -201,7 +213,8 @@ async def send_device_message(
)

log_kv({"remote_messages": remote_messages})
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
self.federation.send_device_messages(destination)
if self.federation_sender:
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
self.federation_sender.send_device_messages(destination)
9 changes: 9 additions & 0 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
EventsStream,
FederationStream,
Stream,
ToDeviceStream,
TypingStream,
)

Expand Down Expand Up @@ -115,6 +116,14 @@ def __init__(self, hs):

continue

if isinstance(stream, ToDeviceStream):
# Only add ToDeviceStream as a source on instances in charge of
# sending to device messages.
if hs.get_instance_name() in hs.config.worker.writers.to_device:
self._streams_to_replicate.append(stream)

continue

if isinstance(stream, TypingStream):
# Only add TypingStream as a source on the instance in charge of
# typing.
Expand Down

0 comments on commit 7d43cb7

Please sign in to comment.