Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Start work on avoiding table locks for upserts #2684

Merged
merged 4 commits into from
Nov 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 46 additions & 22 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,31 +469,54 @@ def _simple_insert_many_txn(txn, table, values):

txn.executemany(sql, vals)

@defer.inlineCallbacks
def _simple_upsert(self, table, keyvalues, values,
insertion_values={}, desc="_simple_upsert", lock=True):
"""

`lock` should generally be set to True (the default), but can be set
to False if either of the following are true:

* there is a UNIQUE INDEX on the key columns. In this case a conflict
will cause an IntegrityError in which case this function will retry
the update.

* we somehow know that we are the only thread which will be updating
this table.

Args:
table (str): The table to upsert into
keyvalues (dict): The unique key tables and their new values
values (dict): The nonunique columns and their new values
insertion_values (dict): key/values to use when inserting
insertion_values (dict): additional key/values to use only when
inserting
lock (bool): True to lock the table when doing the upsert.
Returns:
Deferred(bool): True if a new entry was created, False if an
existing one was updated.
"""
return self.runInteraction(
desc,
self._simple_upsert_txn, table, keyvalues, values, insertion_values,
lock
)
while True:
try:
result = yield self.runInteraction(
desc,
self._simple_upsert_txn, table, keyvalues, values, insertion_values,
lock=lock
)
defer.returnValue(result)
except self.database_engine.module.IntegrityError as e:
# presumably we raced with another transaction: let's retry.
logger.warn(
"IntegrityError when upserting into %s; retrying: %s",
table, e
)

def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
lock=True):
# We need to lock the table :(, unless we're *really* careful
if lock:
self.database_engine.lock_table(txn, table)

# Try to update
# First try to update.
sql = "UPDATE %s SET %s WHERE %s" % (
table,
", ".join("%s = ?" % (k,) for k in values),
Expand All @@ -502,23 +525,24 @@ def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
sqlargs = values.values() + keyvalues.values()

txn.execute(sql, sqlargs)
if txn.rowcount == 0:
# We didn't update and rows so insert a new one
allvalues = {}
allvalues.update(keyvalues)
allvalues.update(values)
allvalues.update(insertion_values)
if txn.rowcount > 0:
# successfully updated at least one row.
return False

sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues)
)
txn.execute(sql, allvalues.values())
# We didn't update any rows so insert a new one
allvalues = {}
allvalues.update(keyvalues)
allvalues.update(values)
allvalues.update(insertion_values)

return True
else:
return False
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues)
)
txn.execute(sql, allvalues.values())
# successfully inserted
return True

def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False, desc="_simple_select_one"):
Expand Down
55 changes: 28 additions & 27 deletions synapse/storage/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,34 +204,35 @@ def add_pusher(self, user_id, access_token, kind, app_id,
pushkey, pushkey_ts, lang, data, last_stream_ordering,
profile_tag=""):
with self._pushers_id_gen.get_next() as stream_id:
def f(txn):
newly_inserted = self._simple_upsert_txn(
txn,
"pushers",
{
"app_id": app_id,
"pushkey": pushkey,
"user_name": user_id,
},
{
"access_token": access_token,
"kind": kind,
"app_display_name": app_display_name,
"device_display_name": device_display_name,
"ts": pushkey_ts,
"lang": lang,
"data": encode_canonical_json(data),
"last_stream_ordering": last_stream_ordering,
"profile_tag": profile_tag,
"id": stream_id,
},
)
if newly_inserted:
# get_if_user_has_pusher only cares if the user has
# at least *one* pusher.
txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
# no need to lock because `pushers` has a unique key on
# (app_id, pushkey, user_name) so _simple_upsert will retry
newly_inserted = yield self._simple_upsert(
table="pushers",
keyvalues={
"app_id": app_id,
"pushkey": pushkey,
"user_name": user_id,
},
values={
"access_token": access_token,
"kind": kind,
"app_display_name": app_display_name,
"device_display_name": device_display_name,
"ts": pushkey_ts,
"lang": lang,
"data": encode_canonical_json(data),
"last_stream_ordering": last_stream_ordering,
"profile_tag": profile_tag,
"id": stream_id,
},
desc="add_pusher",
lock=False,
)

yield self.runInteraction("add_pusher", f)
if newly_inserted:
# get_if_user_has_pusher only cares if the user has
# at least *one* pusher.
self.get_if_user_has_pusher.invalidate(user_id,)

@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
Expand Down