Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Implement device lists updates over federation #1857

Merged
merged 19 commits into from
Jan 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
from synapse.util.async import sleep
Expand All @@ -56,7 +57,7 @@

class FederationSenderSlaveStore(
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedRegistrationStore,
SlavedRegistrationStore, SlavedDeviceStore,
):
pass

Expand Down
26 changes: 25 additions & 1 deletion synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
Expand Down Expand Up @@ -77,6 +78,7 @@ class SynchrotronSlavedStore(
SlavedFilteringStore,
SlavedPresenceStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
RoomStore,
BaseSlavedStore,
ClientIpStore, # After BaseSlavedStore because the constructor is different
Expand Down Expand Up @@ -380,6 +382,27 @@ def notify_from_stream(
stream_key, position, users=users, rooms=rooms
)

@defer.inlineCallbacks
def notify_device_list_update(result):
stream = result.get("device_lists")
if not stream:
return

position_index = stream["field_names"].index("position")
user_index = stream["field_names"].index("user_id")

for row in stream["rows"]:
position = row[position_index]
user_id = row[user_index]

rooms = yield store.get_rooms_for_user(user_id)
room_ids = [r.room_id for r in rooms]

notifier.on_new_event(
"device_list_key", position, rooms=room_ids,
)

@defer.inlineCallbacks
def notify(result):
stream = result.get("events")
if stream:
Expand Down Expand Up @@ -417,6 +440,7 @@ def notify(result):
notify_from_stream(
result, "to_device", "to_device_key", user="user_id"
)
yield notify_device_list_update(result)

while True:
try:
Expand All @@ -427,7 +451,7 @@ def notify(result):
yield store.process_replication(result)
typing_handler.process_replication(result)
yield presence_handler.process_replication(result)
notify(result)
yield notify(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(5)
Expand Down
10 changes: 10 additions & 0 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ def query_client_keys(self, destination, content, timeout):
destination, content, timeout
)

@log_function
def query_user_devices(self, destination, user_id, timeout=30000):
"""Query the device keys for a list of user ids hosted on a remote
server.
"""
sent_queries_counter.inc("user_devices")
return self.transport_layer.query_user_devices(
destination, user_id, timeout
)

@log_function
def claim_client_keys(self, destination, content, timeout):
"""Claims one-time keys for a device hosted on a remote server.
Expand Down
3 changes: 3 additions & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,9 @@ def on_query_auth_request(self, origin, content, room_id, event_id):
def on_query_client_keys(self, origin, content):
return self.on_query_request("client_keys", content)

def on_query_user_devices(self, origin, user_id):
return self.on_query_request("user_devices", user_id)

@defer.inlineCallbacks
@log_function
def on_claim_client_keys(self, origin, content):
Expand Down
133 changes: 76 additions & 57 deletions synapse/federation/transaction_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def __init__(self, hs):
self.pending_failures_by_dest = {}

self.last_device_stream_id_by_dest = {}
self.last_device_list_stream_id_by_dest = {}

# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
Expand Down Expand Up @@ -305,62 +306,74 @@ def _attempt_new_transaction(self, destination):
yield run_on_reactor()

while True:
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_presence = self.pending_presence_by_dest.pop(destination, {})
pending_failures = self.pending_failures_by_dest.pop(destination, [])
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_presence = self.pending_presence_by_dest.pop(destination, {})
pending_failures = self.pending_failures_by_dest.pop(destination, [])

pending_edus.extend(
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
)
pending_edus.extend(
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
)

limiter = yield get_retry_limiter(
destination,
self.clock,
self.store,
)
limiter = yield get_retry_limiter(
destination,
self.clock,
self.store,
)

device_message_edus, device_stream_id = (
yield self._get_new_device_messages(destination)
)
device_message_edus, device_stream_id, dev_list_id = (
yield self._get_new_device_messages(destination)
)

pending_edus.extend(device_message_edus)
if pending_presence:
pending_edus.append(
Edu(
origin=self.server_name,
destination=destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self.clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
pending_edus.extend(device_message_edus)
if pending_presence:
pending_edus.append(
Edu(
origin=self.server_name,
destination=destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self.clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
)

if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
destination, len(pending_pdus))

if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
destination, len(pending_pdus))
if not pending_pdus and not pending_edus and not pending_failures:
logger.debug("TX [%s] Nothing to send", destination)
self.last_device_stream_id_by_dest[destination] = (
device_stream_id
)
return

if not pending_pdus and not pending_edus and not pending_failures:
logger.debug("TX [%s] Nothing to send", destination)
self.last_device_stream_id_by_dest[destination] = (
device_stream_id
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, pending_failures,
limiter=limiter,
)
if success:
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if device_message_edus:
yield self.store.delete_device_msgs_for_remote(
destination, device_stream_id
)
logger.info("Marking as sent %r %r", destination, dev_list_id)
yield self.store.mark_as_sent_devices_by_remote(
destination, dev_list_id
)
return

success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, pending_failures,
device_stream_id,
should_delete_from_device_stream=bool(device_message_edus),
limiter=limiter,
)
if not success:
break
self.last_device_stream_id_by_dest[destination] = device_stream_id
self.last_device_list_stream_id_by_dest[destination] = dev_list_id
else:
break
except NotRetryingDestination:
logger.debug(
"TX [%s] not ready for retry yet - "
Expand All @@ -387,13 +400,26 @@ def _get_new_device_messages(self, destination):
)
for content in contents
]
defer.returnValue((edus, stream_id))

last_device_list = self.last_device_list_stream_id_by_dest.get(destination, 0)
now_stream_id, results = yield self.store.get_devices_by_remote(
destination, last_device_list
)
edus.extend(
Edu(
origin=self.server_name,
destination=destination,
edu_type="m.device_list_update",
content=content,
)
for content in results
)
defer.returnValue((edus, stream_id, now_stream_id))

@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
pending_failures, device_stream_id,
should_delete_from_device_stream, limiter):
pending_failures, limiter):

# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
Expand Down Expand Up @@ -504,13 +530,6 @@ def json_data_cb():
"Failed to send event %s to %s", p.event_id, destination
)
success = False
else:
# Remove the acknowledged device messages from the database
if should_delete_from_device_stream:
yield self.store.delete_device_msgs_for_remote(
destination, device_stream_id
)
self.last_device_stream_id_by_dest[destination] = device_stream_id
except RuntimeError as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
Expand Down
26 changes: 26 additions & 0 deletions synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,32 @@ def query_client_keys(self, destination, query_content, timeout):
)
defer.returnValue(content)

@defer.inlineCallbacks
@log_function
def query_user_devices(self, destination, user_id, timeout):
"""Query the devices for a user id hosted on a remote server.

Response:
{
"stream_id": "...",
"devices": [ { ... } ]
}

Args:
destination(str): The server to query.
query_content(dict): The user ids to query.
Returns:
A dict containg the device keys.
"""
path = PREFIX + "/user/devices/" + user_id

content = yield self.client.get_json(
destination=destination,
path=path,
timeout=timeout,
)
defer.returnValue(content)

@defer.inlineCallbacks
@log_function
def claim_client_keys(self, destination, query_content, timeout):
Expand Down
8 changes: 8 additions & 0 deletions synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,13 @@ def on_POST(self, origin, content, query):
return self.handler.on_query_client_keys(origin, content)


class FederationUserDevicesQueryServlet(BaseFederationServlet):
PATH = "/user/devices/(?P<user_id>[^/]*)"

def on_GET(self, origin, content, query, user_id):
return self.handler.on_query_user_devices(origin, user_id)


class FederationClientKeysClaimServlet(BaseFederationServlet):
PATH = "/user/keys/claim"

Expand Down Expand Up @@ -613,6 +620,7 @@ def on_GET(self, origin, content, query):
FederationGetMissingEventsServlet,
FederationEventAuthServlet,
FederationClientKeysQueryServlet,
FederationUserDevicesQueryServlet,
FederationClientKeysClaimServlet,
FederationThirdPartyInviteExchangeServlet,
On3pidBindServlet,
Expand Down
Loading