Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Change device list streams to have one row per ID #7010

Merged
merged 8 commits into from
Mar 19, 2020
8 changes: 7 additions & 1 deletion synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ def __init__(self, database: Database, db_conn, hs):
self.hs = hs

self._device_list_id_gen = SlavedIdTracker(
db_conn, "device_lists_stream", "stream_id"
db_conn,
"device_lists_stream",
"stream_id",
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
],
)
device_list_max = self._device_list_id_gen.get_current_token()
self._device_list_stream_cache = StreamChangeCache(
Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/data_stores/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ def __init__(self, database: Database, db_conn, hs):
db_conn,
"device_lists_stream",
"stream_id",
extra_tables=[("user_signature_stream", "stream_id")],
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
],
)
self._cross_signing_id_gen = StreamIdGenerator(
db_conn, "e2e_cross_signing_keys", "stream_id"
Expand Down
59 changes: 41 additions & 18 deletions synapse/storage/data_stores/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1017,29 +1017,41 @@ def add_device_change_to_streams(self, user_id, device_ids, hosts):
"""Persist that a user's devices have been updated, and which hosts
(if any) should be poked.
"""
with self._device_list_id_gen.get_next() as stream_id:
if not device_ids:
return

with self._device_list_id_gen.get_next_mult(len(device_ids)) as stream_ids:
yield self.db.runInteraction(
"add_device_change_to_streams",
self._add_device_change_txn,
"add_device_change_to_stream",
self._add_device_change_to_stream_txn,
user_id,
device_ids,
stream_ids,
)

if not hosts:
return stream_ids[-1]

context = get_active_span_text_map()
with self._device_list_id_gen.get_next_mult(
len(hosts) * len(device_ids)
) as stream_ids:
yield self.db.runInteraction(
"add_device_outbound_poke_to_stream",
self._add_device_outbound_poke_to_stream_txn,
user_id,
device_ids,
hosts,
stream_id,
stream_ids,
context,
)
return stream_id

def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id):
now = self._clock.time_msec()
return stream_ids[-1]

def _add_device_change_to_stream_txn(self, txn, user_id, device_ids, stream_ids):
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
txn.call_after(
self._device_list_stream_cache.entity_has_changed, user_id, stream_id
self._device_list_stream_cache.entity_has_changed, user_id, stream_ids[-1],
)
for host in hosts:
txn.call_after(
self._device_list_federation_stream_cache.entity_has_changed,
host,
stream_id,
)

# Delete older entries in the table, as we really only care about
# when the latest change happened.
Expand All @@ -1048,27 +1060,38 @@ def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id):
DELETE FROM device_lists_stream
WHERE user_id = ? AND device_id = ? AND stream_id < ?
""",
[(user_id, device_id, stream_id) for device_id in device_ids],
[(user_id, device_id, stream_ids[0]) for device_id in device_ids],
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
)

self.db.simple_insert_many_txn(
txn,
table="device_lists_stream",
values=[
{"stream_id": stream_id, "user_id": user_id, "device_id": device_id}
for device_id in device_ids
for stream_id, device_id in zip(stream_ids, device_ids)
],
)

context = get_active_span_text_map()
def _add_device_outbound_poke_to_stream_txn(
self, txn, user_id, device_ids, hosts, stream_ids, context,
):
for host in hosts:
txn.call_after(
self._device_list_federation_stream_cache.entity_has_changed,
host,
stream_ids[-1],
)

now = self._clock.time_msec()
next_stream_id = iter(stream_ids)

self.db.simple_insert_many_txn(
txn,
table="device_lists_outbound_pokes",
values=[
{
"destination": destination,
"stream_id": stream_id,
"stream_id": next(next_stream_id),
"user_id": user_id,
"device_id": device_id,
"sent": False,
Expand Down