Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Batch upsert user ips #2307

Merged
merged 2 commits into from
Jun 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions synapse/api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from synapse.api.constants import EventTypes, Membership, JoinRules
from synapse.api.errors import AuthError, Codes
from synapse.types import UserID
from synapse.util import logcontext
from synapse.util.metrics import Measure

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -200,7 +199,7 @@ def get_user_by_req(self, request, allow_guest=False, rights="access"):
default=[""]
)[0]
if user and access_token and ip_addr:
logcontext.preserve_fn(self.store.insert_client_ip)(
self.store.insert_client_ip(
user=user,
access_token=access_token,
ip=ip_addr,
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def get_devices_by_user(self, user_id):
device_map = yield self.store.get_devices_by_user(user_id)

ips = yield self.store.get_last_client_ip_by_device(
devices=((user_id, device_id) for device_id in device_map.keys())
user_id, device_id=None
)

devices = device_map.values()
Expand All @@ -133,7 +133,7 @@ def get_device(self, user_id, device_id):
except errors.StoreError:
raise errors.NotFoundError
ips = yield self.store.get_last_client_ip_by_device(
devices=((user_id, device_id),)
user_id, device_id,
)
_update_device_from_client_ips(device, ips)
defer.returnValue(device)
Expand Down
10 changes: 0 additions & 10 deletions synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,6 @@ def _count_users(txn):
ret = yield self.runInteraction("count_users", _count_users)
defer.returnValue(ret)

def get_user_ip_and_agents(self, user):
return self._simple_select_list(
table="user_ips",
keyvalues={"user_id": user.to_string()},
retcols=[
"access_token", "ip", "user_agent", "last_seen"
],
desc="get_user_ip_and_agents",
)

def get_users(self):
"""Function to reterive a list of users in users table.

Expand Down
127 changes: 97 additions & 30 deletions synapse/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import logging

from twisted.internet import defer
from twisted.internet import defer, reactor

from ._base import Cache
from . import background_updates
Expand Down Expand Up @@ -50,7 +50,14 @@ def __init__(self, hs):
columns=["user_id", "device_id", "last_seen"],
)

@defer.inlineCallbacks
# (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}

self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)

def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
now = int(self._clock.time_msec())
key = (user.to_string(), access_token, ip)
Expand All @@ -62,34 +69,48 @@ def insert_client_ip(self, user, access_token, ip, user_agent, device_id):

# Rate-limited inserts
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
defer.returnValue(None)
return

self.client_ip_last_seen.prefill(key, now)

# It's safe not to lock here: a) no unique constraint,
# b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
yield self._simple_upsert(
"user_ips",
keyvalues={
"user_id": user.to_string(),
"access_token": access_token,
"ip": ip,
"user_agent": user_agent,
"device_id": device_id,
},
values={
"last_seen": now,
},
desc="insert_client_ip",
lock=False,
self._batch_row_update[key] = (user_agent, device_id, now)

def _update_client_ips_batch(self):
to_update = self._batch_row_update
self._batch_row_update = {}
return self.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)

def _update_client_ips_batch_txn(self, txn, to_update):
self.database_engine.lock_table(txn, "user_ips")

for entry in to_update.iteritems():
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry

self._simple_upsert_txn(
txn,
table="user_ips",
keyvalues={
"user_id": user_id,
"access_token": access_token,
"ip": ip,
"user_agent": user_agent,
"device_id": device_id,
},
values={
"last_seen": last_seen,
},
lock=False,
)

@defer.inlineCallbacks
def get_last_client_ip_by_device(self, devices):
def get_last_client_ip_by_device(self, user_id, device_id):
"""For each device_id listed, give the user_ip it was last seen on

Args:
devices (iterable[(str, str)]): list of (user_id, device_id) pairs
user_id (str)
device_id (str): If None fetches all devices for the user

Returns:
defer.Deferred: resolves to a dict, where the keys
Expand All @@ -100,6 +121,7 @@ def get_last_client_ip_by_device(self, devices):
res = yield self.runInteraction(
"get_last_client_ip_by_device",
self._get_last_client_ip_by_device_txn,
user_id, device_id,
retcols=(
"user_id",
"access_token",
Expand All @@ -108,23 +130,34 @@ def get_last_client_ip_by_device(self, devices):
"device_id",
"last_seen",
),
devices=devices
)

ret = {(d["user_id"], d["device_id"]): d for d in res}
for key in self._batch_row_update:
uid, access_token, ip = key
if uid == user_id:
user_agent, did, last_seen = self._batch_row_update[key]
if not device_id or did == device_id:
ret[(user_id, device_id)] = {
"user_id": user_id,
"access_token": access_token,
"ip": ip,
"user_agent": user_agent,
"device_id": did,
"last_seen": last_seen,
}
defer.returnValue(ret)

@classmethod
def _get_last_client_ip_by_device_txn(cls, txn, devices, retcols):
def _get_last_client_ip_by_device_txn(cls, txn, user_id, device_id, retcols):
where_clauses = []
bindings = []
for (user_id, device_id) in devices:
if device_id is None:
where_clauses.append("(user_id = ? AND device_id IS NULL)")
bindings.extend((user_id, ))
else:
where_clauses.append("(user_id = ? AND device_id = ?)")
bindings.extend((user_id, device_id))
if device_id is None:
where_clauses.append("user_id = ?")
bindings.extend((user_id, ))
else:
where_clauses.append("(user_id = ? AND device_id = ?)")
bindings.extend((user_id, device_id))

if not where_clauses:
return []
Expand Down Expand Up @@ -152,3 +185,37 @@ def _get_last_client_ip_by_device_txn(cls, txn, devices, retcols):

txn.execute(sql, bindings)
return cls.cursor_to_dict(txn)

@defer.inlineCallbacks
def get_user_ip_and_agents(self, user):
user_id = user.to_string()
results = {}

for key in self._batch_row_update:
uid, access_token, ip = key
if uid == user_id:
user_agent, _, last_seen = self._batch_row_update[key]
results[(access_token, ip)] = (user_agent, last_seen)

rows = yield self._simple_select_list(
table="user_ips",
keyvalues={"user_id": user_id},
retcols=[
"access_token", "ip", "user_agent", "last_seen"
],
desc="get_user_ip_and_agents",
)

results.update(
((row["access_token"], row["ip"]), (row["user_agent"], row["last_seen"]))
for row in rows
)
defer.returnValue(list(
{
"access_token": access_token,
"ip": ip,
"user_agent": user_agent,
"last_seen": last_seen,
}
for (access_token, ip), (user_agent, last_seen) in results.iteritems()
))
5 changes: 1 addition & 4 deletions tests/storage/test_client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ def test_insert_new_client_ip(self):
"access_token", "ip", "user_agent", "device_id",
)

# deliberately use an iterable here to make sure that the lookup
# method doesn't iterate it twice
device_list = iter(((user_id, "device_id"),))
result = yield self.store.get_last_client_ip_by_device(device_list)
result = yield self.store.get_last_client_ip_by_device(user_id, "device_id")

r = result[(user_id, "device_id")]
self.assertDictContainsSubset(
Expand Down