diff --git a/tests/storage/test_database.py b/tests/storage/test_database.py index 8cd7c89ca2f8..8dcf7ef86fee 100644 --- a/tests/storage/test_database.py +++ b/tests/storage/test_database.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Tuple +from typing import Callable, Tuple, List from unittest.mock import Mock, call from twisted.internet import defer @@ -29,6 +29,7 @@ from synapse.util import Clock from tests import unittest +from tests.utils import USE_POSTGRES_FOR_TESTS class TupleComparisonClauseTestCase(unittest.TestCase): @@ -278,3 +279,49 @@ def _test_txn(txn: LoggingTransaction) -> None: ] ) self.assertEqual(exception_callback.call_count, 6) # no additional calls + +class PostgresReplicaIdentityTestCase(unittest.HomeserverTestCase): + if not USE_POSTGRES_FOR_TESTS: + skip = "Requires Postgres" + + def prepare( + self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer + ) -> None: + self.db_pools = homeserver.get_datastores().databases + + def test_all_tables_have_postgres_replica_identity(self) -> None: + """ + Tests that all tables have a Postgres REPLICA IDENTITY. + (See #16224). + + Tables with a PRIMARY KEY have an implied REPLICA IDENTITY and are fine. + Other tables need them to be set with `ALTER TABLE`. + + A REPLICA IDENTITY is required for Postgres logical replication to work + properly without blocking updates and deletes. + """ + + sql = """ + WITH tables_no_pkey AS ( + SELECT tbl.table_schema, tbl.table_name + FROM information_schema.tables tbl + WHERE table_type = 'BASE TABLE' + AND table_schema not in ('pg_catalog', 'information_schema') + AND NOT EXISTS ( + SELECT 1 + FROM information_schema.key_column_usage kcu + WHERE kcu.table_name = tbl.table_name + AND kcu.table_schema = tbl.table_schema + ) + ) + SELECT oid::regclass FROM tables_no_pkey INNER JOIN pg_class ON oid::regclass = table_name::regclass + WHERE relreplident = 'd'; + """ + + def _list_tables_with_missing_replica_identities_txn(txn: LoggingTransaction) -> List[str]: + txn.execute(sql) + return [table_name for table_name, in txn] + + for pool in self.db_pools: + missing = self.get_success(pool.runInteraction("test_list_missing_replica_identities", _list_tables_with_missing_replica_identities_txn)) + self.assertTrue(len(missing) == 0, f"The following tables in the {pool.name()!r} database are missing REPLICA IDENTITIES: {missing!r}.")