diff --git a/changelog.d/16615.misc b/changelog.d/16615.misc new file mode 100644 index 000000000000..37ab711dc657 --- /dev/null +++ b/changelog.d/16615.misc @@ -0,0 +1 @@ +Use more generic database methods. diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 7426dbcad611..62fbd055348a 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -49,7 +49,11 @@ if TYPE_CHECKING: from synapse.server import HomeServer - from synapse.storage.database import DatabasePool, LoggingTransaction + from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, + ) logger = logging.getLogger(__name__) @@ -746,10 +750,10 @@ async def create_index_in_background( The named index will be dropped upon completion of the new index. """ - def create_index_psql(conn: Connection) -> None: + def create_index_psql(conn: "LoggingDatabaseConnection") -> None: conn.rollback() # postgres insists on autocommit for the index - conn.set_session(autocommit=True) # type: ignore + conn.engine.attempt_to_set_autocommit(conn.conn, True) try: c = conn.cursor() @@ -793,9 +797,9 @@ def create_index_psql(conn: Connection) -> None: undo_timeout_sql = f"SET statement_timeout = {default_timeout}" conn.cursor().execute(undo_timeout_sql) - conn.set_session(autocommit=False) # type: ignore + conn.engine.attempt_to_set_autocommit(conn.conn, False) - def create_index_sqlite(conn: Connection) -> None: + def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None: # Sqlite doesn't support concurrent creation of indexes. # # We assume that sqlite doesn't give us invalid indices; however @@ -825,7 +829,9 @@ def create_index_sqlite(conn: Connection) -> None: c.execute(sql) if isinstance(self.db_pool.engine, engines.PostgresEngine): - runner: Optional[Callable[[Connection], None]] = create_index_psql + runner: Optional[ + Callable[[LoggingDatabaseConnection], None] + ] = create_index_psql elif psql_only: runner = None else: diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py index f4bef4c99b35..e25d86818b37 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py @@ -275,7 +275,7 @@ def create_index(conn: LoggingDatabaseConnection) -> None: # we have to set autocommit, because postgres refuses to # CREATE INDEX CONCURRENTLY without it. - conn.set_session(autocommit=True) + conn.engine.attempt_to_set_autocommit(conn.conn, True) try: c = conn.cursor() @@ -301,7 +301,7 @@ def create_index(conn: LoggingDatabaseConnection) -> None: # we should now be able to delete the GIST index. c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist") finally: - conn.set_session(autocommit=False) + conn.engine.attempt_to_set_autocommit(conn.conn, False) if isinstance(self.database_engine, PostgresEngine): await self.db_pool.runWithConnection(create_index) @@ -323,7 +323,7 @@ async def _background_reindex_search_order( def create_index(conn: LoggingDatabaseConnection) -> None: conn.rollback() - conn.set_session(autocommit=True) + conn.engine.attempt_to_set_autocommit(conn.conn, True) c = conn.cursor() # We create with NULLS FIRST so that when we search *backwards* @@ -340,7 +340,7 @@ def create_index(conn: LoggingDatabaseConnection) -> None: ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST) """ ) - conn.set_session(autocommit=False) + conn.engine.attempt_to_set_autocommit(conn.conn, False) await self.db_pool.runWithConnection(create_index) diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 0f9c550b27e4..2c3151526db8 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -492,7 +492,7 @@ def reindex_txn(conn: LoggingDatabaseConnection) -> None: conn.rollback() if isinstance(self.database_engine, PostgresEngine): # postgres insists on autocommit for the index - conn.set_session(autocommit=True) + conn.engine.attempt_to_set_autocommit(conn.conn, True) try: txn = conn.cursor() txn.execute( @@ -501,7 +501,7 @@ def reindex_txn(conn: LoggingDatabaseConnection) -> None: ) txn.execute("DROP INDEX IF EXISTS state_groups_state_id") finally: - conn.set_session(autocommit=False) + conn.engine.attempt_to_set_autocommit(conn.conn, False) else: txn = conn.cursor() txn.execute( diff --git a/tests/server.py b/tests/server.py index 5a63ecee9f2d..2b63ed3dd866 100644 --- a/tests/server.py +++ b/tests/server.py @@ -88,7 +88,7 @@ from synapse.server import HomeServer from synapse.storage import DataStore from synapse.storage.database import LoggingDatabaseConnection -from synapse.storage.engines import PostgresEngine, create_engine +from synapse.storage.engines import create_engine from synapse.storage.prepare_database import prepare_database from synapse.types import ISynapseReactor, JsonDict from synapse.util import Clock @@ -1029,9 +1029,7 @@ def setup_test_homeserver( # Create the database before we actually try and connect to it, based off # the template database we generate in setupdb() - if isinstance(db_engine, PostgresEngine): - import psycopg2.extensions - + if USE_POSTGRES_FOR_TESTS: db_conn = db_engine.module.connect( dbname=POSTGRES_BASE_DB, user=POSTGRES_USER, @@ -1039,8 +1037,7 @@ def setup_test_homeserver( port=POSTGRES_PORT, password=POSTGRES_PASSWORD, ) - assert isinstance(db_conn, psycopg2.extensions.connection) - db_conn.autocommit = True + db_engine.attempt_to_set_autocommit(db_conn, True) cur = db_conn.cursor() cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,)) cur.execute( @@ -1065,13 +1062,12 @@ def setup_test_homeserver( hs.setup() - if isinstance(db_engine, PostgresEngine): + if USE_POSTGRES_FOR_TESTS: database_pool = hs.get_datastores().databases[0] # We need to do cleanup on PostgreSQL def cleanup() -> None: import psycopg2 - import psycopg2.extensions # Close all the db pools database_pool._db_pool.close() @@ -1086,8 +1082,7 @@ def cleanup() -> None: port=POSTGRES_PORT, password=POSTGRES_PASSWORD, ) - assert isinstance(db_conn, psycopg2.extensions.connection) - db_conn.autocommit = True + db_engine.attempt_to_set_autocommit(db_conn, True) cur = db_conn.cursor() # Try a few times to drop the DB. Some things may hold on to the