Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove duplicates in the user_ips table and add an index #4370

Merged
merged 23 commits into from
Jan 11, 2019
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ cache:
#
- $HOME/.cache/pip/wheels

addons:
postgresql: "9.4"

# don't clone the whole repo history, one commit will do
git:
depth: 1
Expand Down
1 change: 1 addition & 0 deletions changelog.d/4370.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Apply a unique index to the user_ips table, preventing duplicates.
hawkowl marked this conversation as resolved.
Show resolved Hide resolved
90 changes: 79 additions & 11 deletions synapse/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,21 @@ def __init__(self, db_conn, hs):
columns=["last_seen"],
)

# (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
self.register_background_update_handler(
"user_ips_remove_dupes",
self._remove_user_ip_dupes,
)

# Register a unique index
self.register_background_index_update(
"user_ips_device_unique_index",
index_name="user_ips_device_unique_id",
table="user_ips",
columns=["access_token", "ip", "user_agent"],
unique=True,
)

# (access_token, ip, user_agent) -> (user_id, device_id, last_seen)
self._batch_row_update = {}

self._client_ip_looper = self._clock.looping_call(
Expand All @@ -75,12 +89,66 @@ def __init__(self, db_conn, hs):
"before", "shutdown", self._update_client_ips_batch
)

@defer.inlineCallbacks
def _remove_user_ip_dupes(self, progress, batch_size):
yield self._remove_user_ip_dupes_impl()
yield self._end_background_update("user_ips_remove_dupes")
defer.returnValue(1)

@defer.inlineCallbacks
def _remove_user_ip_dupes_impl(self):

def get_users(txn):
txn.execute("SELECT DISTINCT user_id FROM user_ips;")
results = txn.fetchall()
return results

users = yield self.runInteraction("user_ips_dups_get_users", get_users)

def _clean(txn, user_id):

txn.execute(
(
"SELECT access_token, ip, user_agent, last_seen "
"FROM user_ips WHERE user_id = ?"
),
(user_id,)
)
results = txn.fetchall()

seen_before = set()
seen_before_latest = {}
duplicates = set()

for i in results:
key = i[0:3]
if key not in seen_before:
seen_before.add(key)
seen_before_latest[key] = i[3]
else:
duplicates.add(key)
if seen_before_latest[key] < i[3]:
seen_before_latest[key] = i[3]

for d in sorted(duplicates):
access_token, ip, user_agent = d
txn.execute(
(
"DELETE FROM user_ips WHERE access_token IS ? AND ip IS ? "
"AND user_agent IS ? AND last_seen != ?"
),
(access_token, ip, user_agent, seen_before_latest[d])
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May as well use the _simple_{insert,delete}_txn wrappers here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeahhhh but everything else in this function is sql :P


for user in users:
yield self.runInteraction("user_ips_clean", _clean, user[0])

@defer.inlineCallbacks
def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
now=None):
if not now:
now = int(self._clock.time_msec())
key = (user_id, access_token, ip)
key = (access_token, ip, user_agent)

try:
last_seen = self.client_ip_last_seen.get(key)
Expand All @@ -93,7 +161,7 @@ def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,

self.client_ip_last_seen.prefill(key, now)

self._batch_row_update[key] = (user_agent, device_id, now)
self._batch_row_update[key] = (user_id, device_id, now)

def _update_client_ips_batch(self):

Expand All @@ -117,20 +185,20 @@ def _update_client_ips_batch_txn(self, txn, to_update):
self.database_engine.lock_table(txn, "user_ips")

for entry in iteritems(to_update):
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
(access_token, ip, user_agent), (user_id, device_id, last_seen) = entry

try:
self._simple_upsert_txn(
txn,
table="user_ips",
keyvalues={
"user_id": user_id,
"access_token": access_token,
"ip": ip,
"user_agent": user_agent,
"device_id": device_id,
},
values={
"user_id": user_id,
"device_id": device_id,
"last_seen": last_seen,
},
lock=False,
Expand Down Expand Up @@ -169,9 +237,9 @@ def get_last_client_ip_by_device(self, user_id, device_id):

ret = {(d["user_id"], d["device_id"]): d for d in res}
for key in self._batch_row_update:
uid, access_token, ip = key
if uid == user_id:
user_agent, did, last_seen = self._batch_row_update[key]
access_token, ip, user_agent = key
user_id, did, last_seen = self._batch_row_update[key]
if user_id == user_id:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cough

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HMMMMMMMMMMMMMMMMMMMMMMMMMMMMMMMMMMMMMMMMMMM

yeah need some coffee

if not device_id or did == device_id:
ret[(user_id, device_id)] = {
"user_id": user_id,
Expand Down Expand Up @@ -227,9 +295,9 @@ def get_user_ip_and_agents(self, user):
results = {}

for key in self._batch_row_update:
uid, access_token, ip = key
access_token, ip, user_agent = key
uid, _, last_seen = self._batch_row_update[key]
if uid == user_id:
user_agent, _, last_seen = self._batch_row_update[key]
results[(access_token, ip)] = (user_agent, last_seen)

rows = yield self._simple_select_list(
Expand Down
22 changes: 22 additions & 0 deletions synapse/storage/schema/delta/53/user_ips_index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- delete duplicates
INSERT INTO background_updates (update_name, progress_json) VALUES
('user_ips_remove_dupes', '{}');

-- add a new unique index to user_ips table
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('user_ips_device_unique_index', '{}', 'user_ips_remove_dupes');
57 changes: 57 additions & 0 deletions tests/storage/test_client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,63 @@ def make_homeserver(self, reactor, clock):
def prepare(self, hs, reactor, clock):
self.store = self.hs.get_datastore()

def test_cleanup(self):
# Executions we've ran
executions = []

# SQL fetch results
results = [
[("user1",), ("user2",)], # Initial user fetch
[
("A", "1", "user agent", 1234),
("A", "1", "user agent", 1235),
("A", "1", "user agent", 1236),
("B", "1", "user agent", 1237),
("B", "1", "user agent", 1238),
], # User 1 has duplicates
[
("C", "2", "user agent", 1234),
("D", "2", "user agent", 1234),
], # User 2 does not
]

txn = Mock()

def execute(sql, args=None):
executions.append((sql, args))

def fetchall():
return results.pop(0)

txn.execute = execute
txn.fetchall = fetchall

def runInteraction(name, func, *args):
return func(txn, *args)

self.store.runInteraction = runInteraction

self.get_success(self.store._remove_user_ip_dupes_impl())

# We want five executions:
# Fetch all the users from user_ips
# Fetch user1's entries
# Delete user1's duplicates for access token A
# Delete user1's duplicates for access token B
# Fetch user2's entries
self.assertEqual(len(executions), 5)

# First deletion is all access token As that aren't the latest
self.assertEqual(executions[2][1], ("A", "1", "user agent", 1236))

# Second deletion is all access token Bs that aren't the latest
self.assertEqual(executions[3][1], ("B", "1", "user agent", 1238))

self.assertEqual(
[x[0].split(" ")[0] for x in executions],
["SELECT", "SELECT", "DELETE", "DELETE", "SELECT"],
)

def test_insert_new_client_ip(self):
self.reactor.advance(12345678)

Expand Down