Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2237 from matrix-org/erikj/sync_key_count
Browse files Browse the repository at this point in the history
Add count of one time keys to sync stream
  • Loading branch information
erikjohnston authored May 23, 2017
2 parents 58c4720 + 8cf9f0a commit fbbc40f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 15 deletions.
11 changes: 11 additions & 0 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ class SyncResult(collections.namedtuple("SyncResult", [
"archived", # ArchivedSyncResult for each archived room.
"to_device", # List of direct messages for the device.
"device_lists", # List of user_ids whose devices have chanegd
"device_one_time_keys_count", # Dict of algorithm to count for one time keys
# for this device
])):
__slots__ = []

Expand Down Expand Up @@ -550,6 +552,14 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False):
sync_result_builder
)

device_id = sync_config.device_id
one_time_key_counts = {}
if device_id:
user_id = sync_config.user.to_string()
one_time_key_counts = yield self.store.count_e2e_one_time_keys(
user_id, device_id
)

defer.returnValue(SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
Expand All @@ -558,6 +568,7 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False):
archived=sync_result_builder.archived,
to_device=sync_result_builder.to_device,
device_lists=device_lists,
device_one_time_keys_count=one_time_key_counts,
next_batch=sync_result_builder.now_token,
))

Expand Down
2 changes: 2 additions & 0 deletions synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore
from synapse.storage.end_to_end_keys import EndToEndKeyStore
from synapse.util.caches.stream_change_cache import StreamChangeCache


Expand Down Expand Up @@ -45,6 +46,7 @@ def __init__(self, db_conn, hs):
_mark_as_sent_devices_by_remote_txn = (
DataStore._mark_as_sent_devices_by_remote_txn.__func__
)
count_e2e_one_time_keys = EndToEndKeyStore.__dict__["count_e2e_one_time_keys"]

def stream_positions(self):
result = super(SlavedDeviceStore, self).stream_positions()
Expand Down
1 change: 1 addition & 0 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def on_GET(self, request):
"invite": invited,
"leave": archived,
},
"device_one_time_keys_count": sync_result.device_one_time_keys_count,
"next_batch": sync_result.next_batch.to_string(),
}

Expand Down
35 changes: 20 additions & 15 deletions synapse/storage/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ def _add_e2e_one_time_keys(txn):
for algorithm, key_id, json_bytes in new_keys
],
)
txn.call_after(
self.count_e2e_one_time_keys.invalidate, (user_id, device_id,)
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id,)
)
yield self.runInteraction(
"add_e2e_one_time_keys_insert", _add_e2e_one_time_keys
Expand Down Expand Up @@ -237,24 +237,29 @@ def _claim_e2e_one_time_keys(txn):
)
for user_id, device_id, algorithm, key_id in delete:
txn.execute(sql, (user_id, device_id, algorithm, key_id))
txn.call_after(
self.count_e2e_one_time_keys.invalidate, (user_id, device_id,)
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id,)
)
return result
return self.runInteraction(
"claim_e2e_one_time_keys", _claim_e2e_one_time_keys
)

@defer.inlineCallbacks
def delete_e2e_keys_by_device(self, user_id, device_id):
yield self._simple_delete(
table="e2e_device_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
desc="delete_e2e_device_keys_by_device"
)
yield self._simple_delete(
table="e2e_one_time_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
desc="delete_e2e_one_time_keys_by_device"
def delete_e2e_keys_by_device_txn(txn):
self._simple_delete_txn(
txn,
table="e2e_device_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
)
self._simple_delete_txn(
txn,
table="e2e_one_time_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
)
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id,)
)
return self.runInteraction(
"delete_e2e_keys_by_device", delete_e2e_keys_by_device_txn
)
self.count_e2e_one_time_keys.invalidate((user_id, device_id,))

0 comments on commit fbbc40f

Please sign in to comment.