Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
anoadragon453 committed May 10, 2019
1 parent 80b6e1a commit c988c1e
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
2 changes: 1 addition & 1 deletion synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def _get_new_device_messages(self):

last_device_list = self._last_device_list_stream_id
now_stream_id, results = yield self._store.get_devices_by_remote(
self._destination, last_device_list
self._destination, last_device_list, MAX_EDUS_PER_TRANSACTION
)
edus.extend(
Edu(
Expand Down
12 changes: 6 additions & 6 deletions synapse/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def get_devices_by_user(self, user_id):

defer.returnValue({d["device_id"]: d for d in devices})

def get_devices_by_remote(self, destination, from_stream_id):
def get_devices_by_remote(self, destination, from_stream_id, limit=100):
"""Get stream of updates to send to remote servers
Returns:
Expand All @@ -92,10 +92,11 @@ def get_devices_by_remote(self, destination, from_stream_id):
destination,
from_stream_id,
now_stream_id,
limit,
)

def _get_devices_by_remote_txn(
self, txn, destination, from_stream_id, now_stream_id
self, txn, destination, from_stream_id, now_stream_id, limit=100
):
# We retrieve n+1 devices from the list of outbound pokes were n is our
# maximum. We then check if the very last device has the same stream_id as the
Expand All @@ -107,13 +108,12 @@ def _get_devices_by_remote_txn(
# being that such a large device list update is likely an error.
#
# Note: The code below assumes this value is at least 1
maximum_devices = 100
sql = """
SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
ORDER BY stream_id
LIMIT %d
""" % (maximum_devices + 1)
""" % (limit + 1)
txn.execute(sql, (destination, from_stream_id, now_stream_id, False))

duplicate_updates = [r for r in txn]
Expand Down Expand Up @@ -143,7 +143,7 @@ def _get_devices_by_remote_txn(
# Check if the last and second-to-last row's stream_id's are the same
offending_stream_id = None
if (
len(updates) > maximum_devices and
len(updates) > limit and
updates[-1][2] == updates[-2][2]
):
offending_stream_id = updates[-1][2]
Expand All @@ -159,7 +159,7 @@ def _get_devices_by_remote_txn(
# out, then skip this stream_id
if len(query_map) == 0:
return (now_stream_id + 1, [])
elif len(query_map) >= maximum_devices:
elif len(query_map) >= limit :
now_stream_id = max(stream_id for stream_id in itervalues(query_map))

devices = self._get_e2e_device_keys_txn(
Expand Down

0 comments on commit c988c1e

Please sign in to comment.