diff --git a/changelog.d/12192.misc b/changelog.d/12192.misc new file mode 100644 index 000000000000..bdfe8dad98a6 --- /dev/null +++ b/changelog.d/12192.misc @@ -0,0 +1 @@ +Rename `HomeServer.get_tcp_replication` to `get_replication_command_handler`. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 1536a4272333..a10a63b06c7e 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -417,7 +417,7 @@ def start_listening(self) -> None: else: logger.warning("Unsupported listener type: %s", listener.type) - self.get_tcp_replication().start_replication(self) + self.get_replication_command_handler().start_replication(self) def start(config_options: List[str]) -> None: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index a6789a840ede..e4dc04c0b40f 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -273,7 +273,7 @@ def start_listening(self) -> None: # If redis is enabled we connect via the replication command handler # in the same way as the workers (since we're effectively a client # rather than a server). - self.get_tcp_replication().start_replication(self) + self.get_replication_command_handler().start_replication(self) for listener in self.config.server.listeners: if listener.type == "http": diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py index 87e99c7ddf5b..2529dee613aa 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py @@ -63,7 +63,7 @@ def __init__(self, hs: "HomeServer"): self.replication_client = None if hs.config.worker.worker_app: - self.replication_client = hs.get_tcp_replication() + self.replication_client = hs.get_replication_command_handler() # A method just so we can pass 'self' as the authenticator to the Servlets async def authenticate_request( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index c155098beeb8..9927a30e6ed5 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -424,13 +424,13 @@ def __init__(self, hs: "HomeServer"): async def _on_shutdown(self) -> None: if self._presence_enabled: - self.hs.get_tcp_replication().send_command( + self.hs.get_replication_command_handler().send_command( ClearUserSyncsCommand(self.instance_id) ) def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None: if self._presence_enabled: - self.hs.get_tcp_replication().send_user_sync( + self.hs.get_replication_command_handler().send_user_sync( self.instance_id, user_id, is_syncing, last_sync_ms ) diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py index b5b84c09ae41..14706a081755 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py @@ -54,6 +54,6 @@ async def insert_client_ip( self.client_ip_last_seen.set(key, now) - self.hs.get_tcp_replication().send_user_ip( + self.hs.get_replication_command_handler().send_user_ip( user_id, access_token, ip, user_agent, device_id, now ) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index b8fc1d4db95e..deeaaec4e66c 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -462,6 +462,8 @@ async def _save_and_send_ack(self) -> None: # We ACK this token over replication so that the master can drop # its in memory queues - self._hs.get_tcp_replication().send_federation_ack(current_position) + self._hs.get_replication_command_handler().send_federation_ack( + current_position + ) except Exception: logger.exception("Error updating federation stream position") diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 0d2013a3cfc5..d51f045f229a 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -295,9 +295,7 @@ async def _process_command( raise Exception("Unrecognised command %s in stream queue", cmd.NAME) def start_replication(self, hs: "HomeServer") -> None: - """Helper method to start a replication connection to the remote server - using TCP. - """ + """Helper method to start replication.""" if hs.config.redis.redis_enabled: from synapse.replication.tcp.redis import ( RedisDirectTcpReplicationClientFactory, diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index b84e572da136..989c5be0327e 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -325,7 +325,7 @@ def __init__( password=hs.config.redis.redis_password, ) - self.synapse_handler = hs.get_tcp_replication() + self.synapse_handler = hs.get_replication_command_handler() self.synapse_stream_name = hs.hostname self.synapse_outbound_redis_connection = outbound_redis_connection diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 494e42a2be8f..ab829040cde9 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -44,7 +44,7 @@ class ReplicationStreamProtocolFactory(ServerFactory): """Factory for new replication connections.""" def __init__(self, hs: "HomeServer"): - self.command_handler = hs.get_tcp_replication() + self.command_handler = hs.get_replication_command_handler() self.clock = hs.get_clock() self.server_name = hs.config.server.server_name @@ -85,7 +85,7 @@ def __init__(self, hs: "HomeServer"): self.is_looping = False self.pending_updates = False - self.command_handler = hs.get_tcp_replication() + self.command_handler = hs.get_replication_command_handler() # Set of streams to replicate. self.streams = self.command_handler.get_streams_to_replicate() diff --git a/synapse/server.py b/synapse/server.py index 46a64418ea0c..1270abb5a335 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -639,7 +639,7 @@ def get_read_marker_handler(self) -> ReadMarkerHandler: return ReadMarkerHandler(self) @cache_in_self - def get_tcp_replication(self) -> ReplicationCommandHandler: + def get_replication_command_handler(self) -> ReplicationCommandHandler: return ReplicationCommandHandler(self) @cache_in_self diff --git a/tests/replication/_base.py b/tests/replication/_base.py index a7a05a564fe9..9c5df266bd1f 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -251,7 +251,7 @@ def setUp(self): self.connect_any_redis_attempts, ) - self.hs.get_tcp_replication().start_replication(self.hs) + self.hs.get_replication_command_handler().start_replication(self.hs) # When we see a connection attempt to the master replication listener we # automatically set up the connection. This is so that tests don't @@ -375,7 +375,7 @@ def make_worker_hs( ) if worker_hs.config.redis.redis_enabled: - worker_hs.get_tcp_replication().start_replication(worker_hs) + worker_hs.get_replication_command_handler().start_replication(worker_hs) return worker_hs diff --git a/tests/replication/tcp/streams/test_events.py b/tests/replication/tcp/streams/test_events.py index f9d5da723cce..641a94133b1d 100644 --- a/tests/replication/tcp/streams/test_events.py +++ b/tests/replication/tcp/streams/test_events.py @@ -420,7 +420,7 @@ def test_backwards_stream_id(self): # Manually send an old RDATA command, which should get dropped. This # re-uses the row from above, but with an earlier stream token. - self.hs.get_tcp_replication().send_command( + self.hs.get_replication_command_handler().send_command( RdataCommand("events", "master", 1, row) ) diff --git a/tests/replication/tcp/streams/test_typing.py b/tests/replication/tcp/streams/test_typing.py index 3ff5afc6e501..9a229dd23f4a 100644 --- a/tests/replication/tcp/streams/test_typing.py +++ b/tests/replication/tcp/streams/test_typing.py @@ -118,7 +118,7 @@ def test_reset(self): # Reset the typing handler self.hs.get_replication_streams()["typing"].last_token = 0 - self.hs.get_tcp_replication()._streams["typing"].last_token = 0 + self.hs.get_replication_command_handler()._streams["typing"].last_token = 0 typing._latest_room_serial = 0 typing._typing_stream_change_cache = StreamChangeCache( "TypingStreamChangeCache", typing._latest_room_serial diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py index 1b6a4bf4b0b1..26b8bd512a7f 100644 --- a/tests/replication/test_federation_ack.py +++ b/tests/replication/test_federation_ack.py @@ -48,7 +48,7 @@ def test_federation_ack_sent(self): transport, rather than assuming that the implementation has a ReplicationCommandHandler. """ - rch = self.hs.get_tcp_replication() + rch = self.hs.get_replication_command_handler() # wire up the ReplicationCommandHandler to a mock connection, which needs # to implement IReplicationConnection. (Note that Mock doesn't understand