From 469e9a6d0b0e2887d94abd074d07083d5aa65440 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 8 Sep 2022 18:02:00 +0100 Subject: [PATCH 1/5] Require SQLite >= 3.27.0 --- synapse/storage/engines/sqlite.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 621f2c5efe28..4d638ddbd4d1 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -70,12 +70,11 @@ def check_database( self, db_conn: sqlite3.Connection, allow_outdated_version: bool = False ) -> None: if not allow_outdated_version: - version = sqlite3.sqlite_version_info # Synapse is untested against older SQLite versions, and we don't want # to let users upgrade to a version of Synapse with broken support for their # sqlite version, because it risks leaving them with a half-upgraded db. - if version < (3, 22, 0): - raise RuntimeError("Synapse requires sqlite 3.22 or above.") + if sqlite3.sqlite_version_info < (3, 27, 0): + raise RuntimeError("Synapse requires sqlite 3.27 or above.") def check_new_database(self, txn: Cursor) -> None: """Gets called when setting up a brand new database. This allows us to From 4e1797b7ce3a919c7e78dab57b58e1735db5034c Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 8 Sep 2022 18:16:39 +0100 Subject: [PATCH 2/5] Assume `can_native_upsert` is True --- synapse/storage/database.py | 34 ++--- synapse/storage/databases/main/lock.py | 121 ++++++------------ synapse/storage/databases/main/stats.py | 86 +++++-------- .../storage/databases/main/transactions.py | 30 ++--- tests/storage/test_base.py | 1 - 5 files changed, 96 insertions(+), 176 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index b394a6658b09..7f6b0e71b0c2 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -533,15 +533,14 @@ def __init__( if isinstance(self.engine, Sqlite3Engine): self._unsafe_to_upsert_tables.add("user_directory_search") - if self.engine.can_native_upsert: - # Check ASAP (and then later, every 1s) to see if we have finished - # background updates of tables that aren't safe to update. - self._clock.call_later( - 0.0, - run_as_background_process, - "upsert_safety_check", - self._check_safe_to_upsert, - ) + # Check ASAP (and then later, every 1s) to see if we have finished + # background updates of tables that aren't safe to update. + self._clock.call_later( + 0.0, + run_as_background_process, + "upsert_safety_check", + self._check_safe_to_upsert, + ) def name(self) -> str: "Return the name of this database" @@ -1160,11 +1159,8 @@ async def simple_upsert( attempts = 0 while True: try: - # We can autocommit if we are going to use native upserts - autocommit = ( - self.engine.can_native_upsert - and table not in self._unsafe_to_upsert_tables - ) + # We can autocommit if it is safe to upsert + autocommit = table not in self._unsafe_to_upsert_tables return await self.runInteraction( desc, @@ -1214,7 +1210,7 @@ def simple_upsert_txn( """ insertion_values = insertion_values or {} - if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables: + if table not in self._unsafe_to_upsert_tables: return self.simple_upsert_txn_native_upsert( txn, table, keyvalues, values, insertion_values=insertion_values ) @@ -1369,10 +1365,8 @@ async def simple_upsert_many( supports native upserts. """ - # We can autocommit if we are going to use native upserts - autocommit = ( - self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables - ) + # We can autocommit if it safe to upsert + autocommit = table not in self._unsafe_to_upsert_tables await self.runInteraction( desc, @@ -1409,7 +1403,7 @@ def simple_upsert_many_txn( lock: True to lock the table when doing the upsert. Unused if the database engine supports native upserts. """ - if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables: + if table not in self._unsafe_to_upsert_tables: return self.simple_upsert_many_txn_native_upsert( txn, table, key_names, key_values, value_names, value_values ) diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index 2d7633fbd5ed..7270ef09da9e 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -129,91 +129,48 @@ async def _try_acquire_lock( now = self._clock.time_msec() token = random_string(6) - if self.db_pool.engine.can_native_upsert: - - def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool: - # We take out the lock if either a) there is no row for the lock - # already, b) the existing row has timed out, or c) the row is - # for this instance (which means the process got killed and - # restarted) - sql = """ - INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT (lock_name, lock_key) - DO UPDATE - SET - token = EXCLUDED.token, - instance_name = EXCLUDED.instance_name, - last_renewed_ts = EXCLUDED.last_renewed_ts - WHERE - worker_locks.last_renewed_ts < ? - OR worker_locks.instance_name = EXCLUDED.instance_name - """ - txn.execute( - sql, - ( - lock_name, - lock_key, - self._instance_name, - token, - now, - now - _LOCK_TIMEOUT_MS, - ), - ) - - # We only acquired the lock if we inserted or updated the table. - return bool(txn.rowcount) - - did_lock = await self.db_pool.runInteraction( - "try_acquire_lock", - _try_acquire_lock_txn, - # We can autocommit here as we're executing a single query, this - # will avoid serialization errors. - db_autocommit=True, + def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool: + # We take out the lock if either a) there is no row for the lock + # already, b) the existing row has timed out, or c) the row is + # for this instance (which means the process got killed and + # restarted) + sql = """ + INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (lock_name, lock_key) + DO UPDATE + SET + token = EXCLUDED.token, + instance_name = EXCLUDED.instance_name, + last_renewed_ts = EXCLUDED.last_renewed_ts + WHERE + worker_locks.last_renewed_ts < ? + OR worker_locks.instance_name = EXCLUDED.instance_name + """ + txn.execute( + sql, + ( + lock_name, + lock_key, + self._instance_name, + token, + now, + now - _LOCK_TIMEOUT_MS, + ), ) - if not did_lock: - return None - - else: - # If we're on an old SQLite we emulate the above logic by first - # clearing out any existing stale locks and then upserting. - - def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool: - sql = """ - DELETE FROM worker_locks - WHERE - lock_name = ? - AND lock_key = ? - AND (last_renewed_ts < ? OR instance_name = ?) - """ - txn.execute( - sql, - (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name), - ) - - inserted = self.db_pool.simple_upsert_txn_emulated( - txn, - table="worker_locks", - keyvalues={ - "lock_name": lock_name, - "lock_key": lock_key, - }, - values={}, - insertion_values={ - "token": token, - "last_renewed_ts": self._clock.time_msec(), - "instance_name": self._instance_name, - }, - ) - - return inserted - did_lock = await self.db_pool.runInteraction( - "try_acquire_lock_emulated", _try_acquire_lock_emulated_txn - ) + # We only acquired the lock if we inserted or updated the table. + return bool(txn.rowcount) - if not did_lock: - return None + did_lock = await self.db_pool.runInteraction( + "try_acquire_lock", + _try_acquire_lock_txn, + # We can autocommit here as we're executing a single query, this + # will avoid serialization errors. + db_autocommit=True, + ) + if not did_lock: + return None lock = Lock( self._reactor, diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py index b4c652acf34e..356d4ca78819 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py @@ -446,59 +446,41 @@ def _upsert_with_additive_relatives_txn( absolutes: Absolute (set) fields additive_relatives: Fields that will be added onto if existing row present. """ - if self.database_engine.can_native_upsert: - absolute_updates = [ - "%(field)s = EXCLUDED.%(field)s" % {"field": field} - for field in absolutes.keys() - ] - - relative_updates = [ - "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)" - % {"table": table, "field": field} - for field in additive_relatives.keys() - ] - - insert_cols = [] - qargs = [] - - for (key, val) in chain( - keyvalues.items(), absolutes.items(), additive_relatives.items() - ): - insert_cols.append(key) - qargs.append(val) + absolute_updates = [ + "%(field)s = EXCLUDED.%(field)s" % {"field": field} + for field in absolutes.keys() + ] + + relative_updates = [ + "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)" + % {"table": table, "field": field} + for field in additive_relatives.keys() + ] + + insert_cols = [] + qargs = [] + + for (key, val) in chain( + keyvalues.items(), absolutes.items(), additive_relatives.items() + ): + insert_cols.append(key) + qargs.append(val) + + sql = """ + INSERT INTO %(table)s (%(insert_cols_cs)s) + VALUES (%(insert_vals_qs)s) + ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s + """ % { + "table": table, + "insert_cols_cs": ", ".join(insert_cols), + "insert_vals_qs": ", ".join( + ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) + ), + "key_columns": ", ".join(keyvalues), + "updates": ", ".join(chain(absolute_updates, relative_updates)), + } - sql = """ - INSERT INTO %(table)s (%(insert_cols_cs)s) - VALUES (%(insert_vals_qs)s) - ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s - """ % { - "table": table, - "insert_cols_cs": ", ".join(insert_cols), - "insert_vals_qs": ", ".join( - ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) - ), - "key_columns": ", ".join(keyvalues), - "updates": ", ".join(chain(absolute_updates, relative_updates)), - } - - txn.execute(sql, qargs) - else: - self.database_engine.lock_table(txn, table) - retcols = list(chain(absolutes.keys(), additive_relatives.keys())) - current_row = self.db_pool.simple_select_one_txn( - txn, table, keyvalues, retcols, allow_none=True - ) - if current_row is None: - merged_dict = {**keyvalues, **absolutes, **additive_relatives} - self.db_pool.simple_insert_txn(txn, table, merged_dict) - else: - for (key, val) in additive_relatives.items(): - if current_row[key] is None: - current_row[key] = val - else: - current_row[key] += val - current_row.update(absolutes) - self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row) + txn.execute(sql, qargs) async def _calculate_and_set_initial_state_for_room(self, room_id: str) -> None: """Calculate and insert an entry into room_stats_current. diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index ba79e19f7fe8..f8c6877ee847 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -221,25 +221,15 @@ async def set_destination_retry_timings( retry_interval: how long until next retry in ms """ - if self.database_engine.can_native_upsert: - await self.db_pool.runInteraction( - "set_destination_retry_timings", - self._set_destination_retry_timings_native, - destination, - failure_ts, - retry_last_ts, - retry_interval, - db_autocommit=True, # Safe as its a single upsert - ) - else: - await self.db_pool.runInteraction( - "set_destination_retry_timings", - self._set_destination_retry_timings_emulated, - destination, - failure_ts, - retry_last_ts, - retry_interval, - ) + await self.db_pool.runInteraction( + "set_destination_retry_timings", + self._set_destination_retry_timings_native, + destination, + failure_ts, + retry_last_ts, + retry_interval, + db_autocommit=True, # Safe as it's a single upsert + ) def _set_destination_retry_timings_native( self, @@ -249,8 +239,6 @@ def _set_destination_retry_timings_native( retry_last_ts: int, retry_interval: int, ) -> None: - assert self.database_engine.can_native_upsert - # Upsert retry time interval if retry_interval is zero (i.e. we're # resetting it) or greater than the existing retry interval. # diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index cce8e75c7475..40e58f8199d0 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -54,7 +54,6 @@ def runWithConnection(func, *args, **kwargs): sqlite_config = {"name": "sqlite3"} engine = create_engine(sqlite_config) fake_engine = Mock(wraps=engine) - fake_engine.can_native_upsert = False fake_engine.in_transaction.return_value = False db = DatabasePool(Mock(), Mock(config=sqlite_config), fake_engine) From d138ce8d8b2299d25b7f61d04f315e7a62bedfdb Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 8 Sep 2022 18:27:41 +0100 Subject: [PATCH 3/5] Remove `can_native_upsert` --- synapse/storage/engines/_base.py | 8 -------- synapse/storage/engines/postgres.py | 7 ------- synapse/storage/engines/sqlite.py | 8 -------- 3 files changed, 23 deletions(-) diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 971ff8269323..0d16a419a42c 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -43,14 +43,6 @@ def __init__(self, module: DBAPI2Module, config: Mapping[str, Any]): def single_threaded(self) -> bool: ... - @property - @abc.abstractmethod - def can_native_upsert(self) -> bool: - """ - Do we support native UPSERTs? - """ - ... - @property @abc.abstractmethod def supports_using_any_list(self) -> bool: diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 517f9d5f98d7..7f7d006ac209 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -158,13 +158,6 @@ def on_new_connection(self, db_conn: "LoggingDatabaseConnection") -> None: cursor.close() db_conn.commit() - @property - def can_native_upsert(self) -> bool: - """ - Can we use native UPSERTs? - """ - return True - @property def supports_using_any_list(self) -> bool: """Do we support using `a = ANY(?)` and passing a list""" diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 4d638ddbd4d1..095ae0a096b8 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -48,14 +48,6 @@ def __init__(self, database_config: Mapping[str, Any]): def single_threaded(self) -> bool: return True - @property - def can_native_upsert(self) -> bool: - """ - Do we support native UPSERTs? This requires SQLite3 3.24+, plus some - more work we haven't done yet to tell what was inserted vs updated. - """ - return sqlite3.sqlite_version_info >= (3, 24, 0) - @property def supports_using_any_list(self) -> bool: """Do we support using `a = ANY(?)` and passing a list""" From e7ac0c0d00cf1b63e209a95cd7000fc07d611092 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 8 Sep 2022 18:29:57 +0100 Subject: [PATCH 4/5] Changelog --- changelog.d/13760.removal | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13760.removal diff --git a/changelog.d/13760.removal b/changelog.d/13760.removal new file mode 100644 index 000000000000..624e7c367846 --- /dev/null +++ b/changelog.d/13760.removal @@ -0,0 +1 @@ +Synapse will now refuse to start if configured to use SQLite < 3.27. From 25f10b98b3c57cf16ecf9215cea7dfa5cecf6868 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 8 Sep 2022 18:34:49 +0100 Subject: [PATCH 5/5] Adjust upsert-related comments --- synapse/storage/database.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 7f6b0e71b0c2..e881bff7fb48 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1195,7 +1195,7 @@ def simple_upsert_txn( ) -> bool: """ Pick the UPSERT method which works best on the platform. Either the - native one (Pg9.5+, recent SQLites), or fall back to an emulated method. + native one (Pg9.5+, SQLite >= 3.24), or fall back to an emulated method. Args: txn: The transaction to use. @@ -1203,7 +1203,8 @@ def simple_upsert_txn( keyvalues: The unique key tables and their new values values: The nonunique columns and their new values insertion_values: additional key/values to use only when inserting - lock: True to lock the table when doing the upsert. + lock: True to lock the table when doing the upsert. Unused when performing + a native upsert. Returns: Returns True if a row was inserted or updated (i.e. if `values` is not empty then this always returns True) @@ -1361,8 +1362,8 @@ async def simple_upsert_many( value_names: The value column names value_values: A list of each row's value column values. Ignored if value_names is empty. - lock: True to lock the table when doing the upsert. Unused if the database engine - supports native upserts. + lock: True to lock the table when doing the upsert. Unused when performing + a native upsert. """ # We can autocommit if it safe to upsert @@ -1400,8 +1401,8 @@ def simple_upsert_many_txn( value_names: The value column names value_values: A list of each row's value column values. Ignored if value_names is empty. - lock: True to lock the table when doing the upsert. Unused if the database engine - supports native upserts. + lock: True to lock the table when doing the upsert. Unused when performing + a native upsert. """ if table not in self._unsafe_to_upsert_tables: return self.simple_upsert_many_txn_native_upsert(