Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

fix race condiftion in calling initialise_reserved_users #4081

Merged
merged 7 commits into from
Oct 25, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/4081.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix race condition in populating reserved users
neilisfragile marked this conversation as resolved.
Show resolved Hide resolved
8 changes: 0 additions & 8 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,14 +553,6 @@ def start_generate_monthly_active_users():
generate_monthly_active_users,
)

# XXX is this really supposed to be a background process? it looks
# like it needs to complete before some of the other stuff runs.
run_as_background_process(
"initialise_reserved_users",
hs.get_datastore().initialise_reserved_users,
hs.config.mau_limits_reserved_threepids,
)

start_generate_monthly_active_users()
if hs.config.limit_usage_by_mau:
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
Expand Down
45 changes: 32 additions & 13 deletions synapse/storage/monthly_active_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,27 @@ def __init__(self, dbconn, hs):
self._clock = hs.get_clock()
self.hs = hs
self.reserved_users = ()
self.initialise_reserved_users(
dbconn.cursor(), hs.config.mau_limits_reserved_threepids
)

@defer.inlineCallbacks
def initialise_reserved_users(self, threepids):
store = self.hs.get_datastore()
def initialise_reserved_users(self, txn, threepids):
neilisfragile marked this conversation as resolved.
Show resolved Hide resolved
"""Ensures that reserved threepids are accounted for in the MAU table, should
be called on start up.

Arguments:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be Args

threepids []: List of threepid dicts to reserve
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the type here is incorrect

neilisfragile marked this conversation as resolved.
Show resolved Hide resolved
"""
reserved_user_list = []

# Do not add more reserved users than the total allowable number
for tp in threepids[:self.hs.config.max_mau_value]:
neilisfragile marked this conversation as resolved.
Show resolved Hide resolved
user_id = yield store.get_user_id_by_threepid(
user_id = self.get_user_id_by_threepid_txn(
txn,
tp["medium"], tp["address"]
)
if user_id:
yield self.upsert_monthly_active_user(user_id)
self.upsert_monthly_active_user_txn(txn, user_id)
neilisfragile marked this conversation as resolved.
Show resolved Hide resolved
reserved_user_list.append(user_id)
else:
logger.warning(
Expand All @@ -55,8 +63,7 @@ def initialise_reserved_users(self, threepids):

@defer.inlineCallbacks
def reap_monthly_active_users(self):
"""
Cleans out monthly active user table to ensure that no stale
"""Cleans out monthly active user table to ensure that no stale
entries exist.

Returns:
Expand Down Expand Up @@ -165,19 +172,33 @@ def get_registered_reserved_users_count(self):

@defer.inlineCallbacks
def upsert_monthly_active_user(self, user_id):
"""Updates or inserts monthly active user member
neilisfragile marked this conversation as resolved.
Show resolved Hide resolved
Arguments:
neilisfragile marked this conversation as resolved.
Show resolved Hide resolved
user_id (str): user to add/update
"""
is_insert = yield self.runInteraction(
"upsert_monthly_active_user", self.upsert_monthly_active_user_txn,
user_id
)
if is_insert:
self.user_last_seen_monthly_active.invalidate((user_id,))
self.get_monthly_active_count.invalidate(())

def upsert_monthly_active_user_txn(self, txn, user_id):
"""
Updates or inserts monthly active user member
Arguments:
neilisfragile marked this conversation as resolved.
Show resolved Hide resolved
txn (cursor):
user_id (str): user to add/update
Deferred[bool]: True if a new entry was created, False if an
bool: True if a new entry was created, False if an
neilisfragile marked this conversation as resolved.
Show resolved Hide resolved
existing one was updated.
"""
# Am consciously deciding to lock the table on the basis that is ought
# never be a big table and alternative approaches (batching multiple
# upserts into a single txn) introduced a lot of extra complexity.
# See https://github.com/matrix-org/synapse/issues/3854 for more
is_insert = yield self._simple_upsert(
desc="upsert_monthly_active_user",
is_insert = self._simple_upsert_txn(
txn,
table="monthly_active_users",
keyvalues={
"user_id": user_id,
Expand All @@ -186,9 +207,7 @@ def upsert_monthly_active_user(self, user_id):
"timestamp": int(self._clock.time_msec()),
},
)
if is_insert:
self.user_last_seen_monthly_active.invalidate((user_id,))
neilisfragile marked this conversation as resolved.
Show resolved Hide resolved
self.get_monthly_active_count.invalidate(())
return is_insert

@cached(num_args=1)
def user_last_seen_monthly_active(self, user_id):
Expand Down
16 changes: 12 additions & 4 deletions synapse/storage/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,17 +474,25 @@ def user_get_threepids(self, user_id):

@defer.inlineCallbacks
def get_user_id_by_threepid(self, medium, address):
ret = yield self._simple_select_one(
user_id = yield self.runInteraction(
"get_user_id_by_threepid", self.get_user_id_by_threepid_txn,
medium, address
)
defer.returnValue(user_id)

def get_user_id_by_threepid_txn(self, txn, medium, address):
neilisfragile marked this conversation as resolved.
Show resolved Hide resolved
ret = self._simple_select_one_txn(
txn,
"user_threepids",
{
"medium": medium,
"address": address
},
['user_id'], True, 'get_user_id_by_threepid'
['user_id'], True
)
if ret:
defer.returnValue(ret['user_id'])
defer.returnValue(None)
return ret['user_id']
return None

def user_delete_threepid(self, user_id, medium, address):
return self._simple_delete(
Expand Down
10 changes: 8 additions & 2 deletions tests/storage/test_monthly_active_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ def test_initialise_reserved_users(self):
now = int(self.hs.get_clock().time_msec())
self.store.user_add_threepid(user1, "email", user1_email, now, now)
self.store.user_add_threepid(user2, "email", user2_email, now, now)
self.store.initialise_reserved_users(threepids)

self.store.runInteraction(
"initialise", self.store.initialise_reserved_users, threepids
)
self.pump()

active_count = self.store.get_monthly_active_count()
Expand Down Expand Up @@ -199,7 +202,10 @@ def test_get_reserved_real_user_account(self):
{'medium': 'email', 'address': user2_email},
]
self.hs.config.mau_limits_reserved_threepids = threepids
self.store.initialise_reserved_users(threepids)
self.store.runInteraction(
"initialise", self.store.initialise_reserved_users, threepids
)

self.pump()
count = self.store.get_registered_reserved_users_count()
self.assertEquals(self.get_success(count), 0)
Expand Down