Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Delete stale non-e2e devices for users, take 2 #14595

Merged
merged 5 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14595.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prune user's old devices on login if they have too many.
31 changes: 30 additions & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination

Expand Down Expand Up @@ -421,6 +422,9 @@ async def check_device_registered(

self._check_device_name_length(initial_device_display_name)

# Prune the user's device list if they already have a lot of devices.
await self._prune_too_many_devices(user_id)

if device_id is not None:
new_device = await self.store.store_device(
user_id=user_id,
Expand Down Expand Up @@ -452,6 +456,31 @@ async def check_device_registered(

raise errors.StoreError(500, "Couldn't generate a device ID.")

async def _prune_too_many_devices(self, user_id: str) -> None:
"""Delete any excess old devices this user may have."""
device_ids = await self.store.check_too_many_devices_for_user(user_id)
if not device_ids:
return

# We don't want to block and try and delete tonnes of devices at once,
# so we cap the number of devices we delete synchronously.
first_batch, remaining_device_ids = device_ids[:10], device_ids[10:]
await self.delete_devices(user_id, first_batch)
Comment on lines +465 to +468
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not delete them all in the background?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mainly thinking that it's nice to delete some devices before we let someone create a new one, so that we know every log in we delete at least some of the stale devices.

I don't really think it matters much in hindsight though.


if not remaining_device_ids:
return

# Now spawn a background loop that deletes the rest.
async def _prune_too_many_devices_loop() -> None:
for batch in batch_iter(remaining_device_ids, 10):
await self.delete_devices(user_id, batch)

await self.clock.sleep(1)

run_as_background_process(
"_prune_too_many_devices_loop", _prune_too_many_devices_loop
)

async def _delete_stale_devices(self) -> None:
"""Background task that deletes devices which haven't been accessed for more than
a configured time period.
Expand Down Expand Up @@ -481,7 +510,7 @@ async def delete_all_devices_for_user(
device_ids = [d for d in device_ids if d != except_device_id]
await self.delete_devices(user_id, device_ids)

async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
async def delete_devices(self, user_id: str, device_ids: Collection[str]) -> None:
"""Delete several devices

Args:
Expand Down
79 changes: 78 additions & 1 deletion synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1538,6 +1538,72 @@ def _txn(txn: LoggingTransaction) -> int:

return rows

async def check_too_many_devices_for_user(self, user_id: str) -> List[str]:
"""Check if the user has a lot of devices, and if so return the set of
devices we can prune.

This does *not* return hidden devices or devices with E2E keys.
"""

num_devices = await self.db_pool.simple_select_one_onecol(
table="devices",
keyvalues={"user_id": user_id, "hidden": False},
retcol="COALESCE(COUNT(*), 0)",
desc="count_devices",
)

# We let users have up to ten devices without pruning.
if num_devices <= 10:
return []

# We prune everything older than N days.
max_last_seen = self._clock.time_msec() - 14 * 24 * 60 * 60 * 1000

if num_devices > 50:
# If the user has more than 50 devices, then we chose a last seen
# that ensures we keep at most 50 devices.
sql = """
SELECT last_seen FROM devices
LEFT JOIN e2e_device_keys_json USING (user_id, device_id)
WHERE
user_id = ?
AND NOT hidden
AND last_seen IS NOT NULL
AND key_json IS NULL
ORDER BY last_seen DESC
LIMIT 1
OFFSET 50
"""

rows = await self.db_pool.execute(
"check_too_many_devices_for_user_last_seen", None, sql, (user_id,)
)
if rows:
max_last_seen = max(rows[0][0], max_last_seen)

# Now fetch the devices to delete.
sql = """
SELECT DISTINCT device_id FROM devices
LEFT JOIN e2e_device_keys_json USING (user_id, device_id)
WHERE
user_id = ?
AND NOT hidden
AND last_seen < ?
AND key_json IS NULL
ORDER BY last_seen
"""

def check_too_many_devices_for_user_txn(
txn: LoggingTransaction,
) -> List[str]:
txn.execute(sql, (user_id, max_last_seen))
return [device_id for device_id, in txn]

return await self.db_pool.runInteraction(
"check_too_many_devices_for_user",
check_too_many_devices_for_user_txn,
)


class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
# Because we have write access, this will be a StreamIdGenerator
Expand Down Expand Up @@ -1596,6 +1662,7 @@ async def store_device(
values={},
insertion_values={
"display_name": initial_device_display_name,
"last_seen": self._clock.time_msec(),
"hidden": False,
},
desc="store_device",
Expand Down Expand Up @@ -1641,7 +1708,15 @@ async def store_device(
)
raise StoreError(500, "Problem storing device.")

async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
@cached(max_entries=0)
async def delete_device(self, user_id: str, device_id: str) -> None:
raise NotImplementedError()

# Note: sometimes deleting rows out of `device_inbox` can take a long time,
# so we use a cache so that we deduplicate in flight requests to delete
# devices.
@cachedList(cached_method_name="delete_device", list_name="device_ids")
async def delete_devices(self, user_id: str, device_ids: Collection[str]) -> dict:
"""Deletes several devices.

Args:
Expand Down Expand Up @@ -1678,6 +1753,8 @@ def _delete_devices_txn(txn: LoggingTransaction) -> None:
for device_id in device_ids:
self.device_id_exists_cache.invalidate((user_id, device_id))

return {}

async def update_device(
self, user_id: str, device_id: str, new_display_name: Optional[str] = None
) -> None:
Expand Down
2 changes: 1 addition & 1 deletion tests/handlers/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def test_get_devices_by_user(self) -> None:
"device_id": "xyz",
"display_name": "display 0",
"last_seen_ip": None,
"last_seen_ts": None,
"last_seen_ts": 1000000,
},
device_map["xyz"],
)
Expand Down
4 changes: 3 additions & 1 deletion tests/storage/test_client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ def test_get_last_client_ip_by_device(self, after_persisting: bool):
)
)

last_seen = self.clock.time_msec()

if after_persisting:
# Trigger the storage loop
self.reactor.advance(10)
Expand All @@ -189,7 +191,7 @@ def test_get_last_client_ip_by_device(self, after_persisting: bool):
"device_id": device_id,
"ip": None,
"user_agent": None,
"last_seen": None,
"last_seen": last_seen,
},
],
)
Expand Down