Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Require SQLite >= 3.27.0 #13760

Merged
merged 5 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13760.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Synapse will now refuse to start if configured to use SQLite < 3.27.
47 changes: 21 additions & 26 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,15 +533,14 @@ def __init__(
if isinstance(self.engine, Sqlite3Engine):
self._unsafe_to_upsert_tables.add("user_directory_search")

if self.engine.can_native_upsert:
# Check ASAP (and then later, every 1s) to see if we have finished
# background updates of tables that aren't safe to update.
self._clock.call_later(
0.0,
run_as_background_process,
"upsert_safety_check",
self._check_safe_to_upsert,
)
# Check ASAP (and then later, every 1s) to see if we have finished
# background updates of tables that aren't safe to update.
self._clock.call_later(
0.0,
run_as_background_process,
"upsert_safety_check",
self._check_safe_to_upsert,
)

def name(self) -> str:
"Return the name of this database"
Expand Down Expand Up @@ -1160,11 +1159,8 @@ async def simple_upsert(
attempts = 0
while True:
try:
# We can autocommit if we are going to use native upserts
autocommit = (
self.engine.can_native_upsert
and table not in self._unsafe_to_upsert_tables
)
# We can autocommit if it is safe to upsert
autocommit = table not in self._unsafe_to_upsert_tables

return await self.runInteraction(
desc,
Expand Down Expand Up @@ -1199,22 +1195,23 @@ def simple_upsert_txn(
) -> bool:
"""
Pick the UPSERT method which works best on the platform. Either the
native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
native one (Pg9.5+, SQLite >= 3.24), or fall back to an emulated method.

Args:
txn: The transaction to use.
table: The table to upsert into
keyvalues: The unique key tables and their new values
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
lock: True to lock the table when doing the upsert.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}

if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_txn_native_upsert(
txn, table, keyvalues, values, insertion_values=insertion_values
)
Expand Down Expand Up @@ -1365,14 +1362,12 @@ async def simple_upsert_many(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert. Unused if the database engine
supports native upserts.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
"""

# We can autocommit if we are going to use native upserts
autocommit = (
self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
)
# We can autocommit if it safe to upsert
autocommit = table not in self._unsafe_to_upsert_tables

await self.runInteraction(
desc,
Expand Down Expand Up @@ -1406,10 +1401,10 @@ def simple_upsert_many_txn(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert. Unused if the database engine
supports native upserts.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
"""
if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert(
txn, table, key_names, key_values, value_names, value_values
)
Expand Down
121 changes: 39 additions & 82 deletions synapse/storage/databases/main/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,91 +129,48 @@ async def _try_acquire_lock(
now = self._clock.time_msec()
token = random_string(6)

if self.db_pool.engine.can_native_upsert:

def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
# We take out the lock if either a) there is no row for the lock
# already, b) the existing row has timed out, or c) the row is
# for this instance (which means the process got killed and
# restarted)
sql = """
INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (lock_name, lock_key)
DO UPDATE
SET
token = EXCLUDED.token,
instance_name = EXCLUDED.instance_name,
last_renewed_ts = EXCLUDED.last_renewed_ts
WHERE
worker_locks.last_renewed_ts < ?
OR worker_locks.instance_name = EXCLUDED.instance_name
"""
txn.execute(
sql,
(
lock_name,
lock_key,
self._instance_name,
token,
now,
now - _LOCK_TIMEOUT_MS,
),
)

# We only acquired the lock if we inserted or updated the table.
return bool(txn.rowcount)

did_lock = await self.db_pool.runInteraction(
"try_acquire_lock",
_try_acquire_lock_txn,
# We can autocommit here as we're executing a single query, this
# will avoid serialization errors.
db_autocommit=True,
def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
# We take out the lock if either a) there is no row for the lock
# already, b) the existing row has timed out, or c) the row is
# for this instance (which means the process got killed and
# restarted)
sql = """
INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (lock_name, lock_key)
DO UPDATE
SET
token = EXCLUDED.token,
instance_name = EXCLUDED.instance_name,
last_renewed_ts = EXCLUDED.last_renewed_ts
WHERE
worker_locks.last_renewed_ts < ?
OR worker_locks.instance_name = EXCLUDED.instance_name
"""
txn.execute(
sql,
(
lock_name,
lock_key,
self._instance_name,
token,
now,
now - _LOCK_TIMEOUT_MS,
),
)
if not did_lock:
return None

else:
# If we're on an old SQLite we emulate the above logic by first
# clearing out any existing stale locks and then upserting.

def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
sql = """
DELETE FROM worker_locks
WHERE
lock_name = ?
AND lock_key = ?
AND (last_renewed_ts < ? OR instance_name = ?)
"""
txn.execute(
sql,
(lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
)

inserted = self.db_pool.simple_upsert_txn_emulated(
txn,
table="worker_locks",
keyvalues={
"lock_name": lock_name,
"lock_key": lock_key,
},
values={},
insertion_values={
"token": token,
"last_renewed_ts": self._clock.time_msec(),
"instance_name": self._instance_name,
},
)

return inserted

did_lock = await self.db_pool.runInteraction(
"try_acquire_lock_emulated", _try_acquire_lock_emulated_txn
)
# We only acquired the lock if we inserted or updated the table.
return bool(txn.rowcount)

if not did_lock:
return None
did_lock = await self.db_pool.runInteraction(
"try_acquire_lock",
_try_acquire_lock_txn,
# We can autocommit here as we're executing a single query, this
# will avoid serialization errors.
db_autocommit=True,
)
if not did_lock:
return None

lock = Lock(
self._reactor,
Expand Down
86 changes: 34 additions & 52 deletions synapse/storage/databases/main/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,59 +446,41 @@ def _upsert_with_additive_relatives_txn(
absolutes: Absolute (set) fields
additive_relatives: Fields that will be added onto if existing row present.
"""
if self.database_engine.can_native_upsert:
absolute_updates = [
"%(field)s = EXCLUDED.%(field)s" % {"field": field}
for field in absolutes.keys()
]

relative_updates = [
"%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
% {"table": table, "field": field}
for field in additive_relatives.keys()
]

insert_cols = []
qargs = []

for (key, val) in chain(
keyvalues.items(), absolutes.items(), additive_relatives.items()
):
insert_cols.append(key)
qargs.append(val)
absolute_updates = [
"%(field)s = EXCLUDED.%(field)s" % {"field": field}
for field in absolutes.keys()
]

relative_updates = [
"%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
% {"table": table, "field": field}
for field in additive_relatives.keys()
]

insert_cols = []
qargs = []

for (key, val) in chain(
keyvalues.items(), absolutes.items(), additive_relatives.items()
):
insert_cols.append(key)
qargs.append(val)

sql = """
INSERT INTO %(table)s (%(insert_cols_cs)s)
VALUES (%(insert_vals_qs)s)
ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
""" % {
"table": table,
"insert_cols_cs": ", ".join(insert_cols),
"insert_vals_qs": ", ".join(
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
),
"key_columns": ", ".join(keyvalues),
"updates": ", ".join(chain(absolute_updates, relative_updates)),
}

sql = """
INSERT INTO %(table)s (%(insert_cols_cs)s)
VALUES (%(insert_vals_qs)s)
ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
""" % {
"table": table,
"insert_cols_cs": ", ".join(insert_cols),
"insert_vals_qs": ", ".join(
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
),
"key_columns": ", ".join(keyvalues),
"updates": ", ".join(chain(absolute_updates, relative_updates)),
}

txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, table)
retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
current_row = self.db_pool.simple_select_one_txn(
txn, table, keyvalues, retcols, allow_none=True
)
if current_row is None:
merged_dict = {**keyvalues, **absolutes, **additive_relatives}
self.db_pool.simple_insert_txn(txn, table, merged_dict)
else:
for (key, val) in additive_relatives.items():
if current_row[key] is None:
current_row[key] = val
else:
current_row[key] += val
current_row.update(absolutes)
self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
txn.execute(sql, qargs)

async def _calculate_and_set_initial_state_for_room(self, room_id: str) -> None:
"""Calculate and insert an entry into room_stats_current.
Expand Down
30 changes: 9 additions & 21 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,25 +221,15 @@ async def set_destination_retry_timings(
retry_interval: how long until next retry in ms
"""

if self.database_engine.can_native_upsert:
await self.db_pool.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings_native,
destination,
failure_ts,
retry_last_ts,
retry_interval,
db_autocommit=True, # Safe as its a single upsert
)
else:
await self.db_pool.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings_emulated,
destination,
failure_ts,
retry_last_ts,
retry_interval,
)
await self.db_pool.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings_native,
destination,
failure_ts,
retry_last_ts,
retry_interval,
db_autocommit=True, # Safe as it's a single upsert
)

def _set_destination_retry_timings_native(
self,
Expand All @@ -249,8 +239,6 @@ def _set_destination_retry_timings_native(
retry_last_ts: int,
retry_interval: int,
) -> None:
assert self.database_engine.can_native_upsert

# Upsert retry time interval if retry_interval is zero (i.e. we're
# resetting it) or greater than the existing retry interval.
#
Expand Down
8 changes: 0 additions & 8 deletions synapse/storage/engines/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@ def __init__(self, module: DBAPI2Module, config: Mapping[str, Any]):
def single_threaded(self) -> bool:
...

@property
@abc.abstractmethod
def can_native_upsert(self) -> bool:
"""
Do we support native UPSERTs?
"""
...

@property
@abc.abstractmethod
def supports_using_any_list(self) -> bool:
Expand Down
7 changes: 0 additions & 7 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,6 @@ def on_new_connection(self, db_conn: "LoggingDatabaseConnection") -> None:
cursor.close()
db_conn.commit()

@property
def can_native_upsert(self) -> bool:
"""
Can we use native UPSERTs?
"""
return True

@property
def supports_using_any_list(self) -> bool:
"""Do we support using `a = ANY(?)` and passing a list"""
Expand Down
Loading