Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add count of one time keys to sync stream #2237

Merged
merged 5 commits into from
May 23, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ class SyncResult(collections.namedtuple("SyncResult", [
"archived", # ArchivedSyncResult for each archived room.
"to_device", # List of direct messages for the device.
"device_lists", # List of user_ids whose devices have chanegd
"device_one_time_keys_count", # Dict of algorithm to count for one time keys
# for this device
])):
__slots__ = []

Expand Down Expand Up @@ -550,6 +552,14 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False):
sync_result_builder
)

device_id = sync_config.device_id
one_time_key_counts = {}
if device_id:
user_id = sync_config.user.to_string()
one_time_key_counts = yield self.store.count_e2e_one_time_keys(
user_id, device_id
)

defer.returnValue(SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
Expand All @@ -558,6 +568,7 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False):
archived=sync_result_builder.archived,
to_device=sync_result_builder.to_device,
device_lists=device_lists,
device_one_time_keys_count=one_time_key_counts,
next_batch=sync_result_builder.now_token,
))

Expand Down
2 changes: 2 additions & 0 deletions synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore
from synapse.storage.end_to_end_keys import EndToEndKeyStore
from synapse.util.caches.stream_change_cache import StreamChangeCache


Expand Down Expand Up @@ -45,6 +46,7 @@ def __init__(self, db_conn, hs):
_mark_as_sent_devices_by_remote_txn = (
DataStore._mark_as_sent_devices_by_remote_txn.__func__
)
count_e2e_one_time_keys = EndToEndKeyStore.__dict__["count_e2e_one_time_keys"]

def stream_positions(self):
result = super(SlavedDeviceStore, self).stream_positions()
Expand Down
1 change: 1 addition & 0 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def on_GET(self, request):
"invite": invited,
"leave": archived,
},
"device_one_time_keys_count": sync_result.device_one_time_keys_count,
"next_batch": sync_result.next_batch.to_string(),
}

Expand Down
31 changes: 18 additions & 13 deletions synapse/storage/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ def _add_e2e_one_time_keys(txn):
for algorithm, key_id, json_bytes in new_keys
],
)
txn.call_after(
self.count_e2e_one_time_keys.invalidate, (user_id, device_id,)
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id,)
)
yield self.runInteraction(
"add_e2e_one_time_keys_insert", _add_e2e_one_time_keys
Expand Down Expand Up @@ -245,16 +245,21 @@ def _claim_e2e_one_time_keys(txn):
"claim_e2e_one_time_keys", _claim_e2e_one_time_keys
)

@defer.inlineCallbacks
def delete_e2e_keys_by_device(self, user_id, device_id):
yield self._simple_delete(
table="e2e_device_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
desc="delete_e2e_device_keys_by_device"
)
yield self._simple_delete(
table="e2e_one_time_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
desc="delete_e2e_one_time_keys_by_device"
def delete_e2e_keys_by_device_txn(txn):
self._simple_delete_txn(
txn,
table="e2e_device_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
)
self._simple_delete_txn(
txn,
table="e2e_one_time_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
)
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id,)
)
return self.runInteraction(
"delete_e2e_keys_by_device", delete_e2e_keys_by_device_txn
)
self.count_e2e_one_time_keys.invalidate((user_id, device_id,))