Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Rename get_tcp_replication. #12192

Merged
merged 4 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12192.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Rename `HomeServer.get_tcp_replication` to `get_replication_command_handler`.
2 changes: 1 addition & 1 deletion synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def start_listening(self) -> None:
else:
logger.warning("Unsupported listener type: %s", listener.type)

self.get_tcp_replication().start_replication(self)
self.get_replication_command_handler().start_replication(self)


def start(config_options: List[str]) -> None:
Expand Down
2 changes: 1 addition & 1 deletion synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def start_listening(self) -> None:
# If redis is enabled we connect via the replication command handler
# in the same way as the workers (since we're effectively a client
# rather than a server).
self.get_tcp_replication().start_replication(self)
self.get_replication_command_handler().start_replication(self)

for listener in self.config.server.listeners:
if listener.type == "http":
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/transport/server/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, hs: "HomeServer"):

self.replication_client = None
if hs.config.worker.worker_app:
self.replication_client = hs.get_tcp_replication()
self.replication_client = hs.get_replication_command_handler()

# A method just so we can pass 'self' as the authenticator to the Servlets
async def authenticate_request(
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,13 +424,13 @@ def __init__(self, hs: "HomeServer"):

async def _on_shutdown(self) -> None:
if self._presence_enabled:
self.hs.get_tcp_replication().send_command(
self.hs.get_replication_command_handler().send_command(
ClearUserSyncsCommand(self.instance_id)
)

def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
if self._presence_enabled:
self.hs.get_tcp_replication().send_user_sync(
self.hs.get_replication_command_handler().send_user_sync(
self.instance_id, user_id, is_syncing, last_sync_ms
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,6 @@ async def insert_client_ip(

self.client_ip_last_seen.set(key, now)

self.hs.get_tcp_replication().send_user_ip(
self.hs.get_replication_command_handler().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now
)
4 changes: 3 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ async def _save_and_send_ack(self) -> None:

# We ACK this token over replication so that the master can drop
# its in memory queues
self._hs.get_tcp_replication().send_federation_ack(current_position)
self._hs.get_replication_command_handler().send_federation_ack(
current_position
)
except Exception:
logger.exception("Error updating federation stream position")
4 changes: 1 addition & 3 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,7 @@ async def _process_command(
raise Exception("Unrecognised command %s in stream queue", cmd.NAME)

def start_replication(self, hs: "HomeServer") -> None:
"""Helper method to start a replication connection to the remote server
using TCP.
"""
"""Helper method to start replication."""
if hs.config.redis.redis_enabled:
from synapse.replication.tcp.redis import (
RedisDirectTcpReplicationClientFactory,
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def __init__(
password=hs.config.redis.redis_password,
)

self.synapse_handler = hs.get_tcp_replication()
self.synapse_handler = hs.get_replication_command_handler()
self.synapse_stream_name = hs.hostname

self.synapse_outbound_redis_connection = outbound_redis_connection
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ReplicationStreamProtocolFactory(ServerFactory):
"""Factory for new replication connections."""

def __init__(self, hs: "HomeServer"):
self.command_handler = hs.get_tcp_replication()
self.command_handler = hs.get_replication_command_handler()
self.clock = hs.get_clock()
self.server_name = hs.config.server.server_name

Expand Down Expand Up @@ -85,7 +85,7 @@ def __init__(self, hs: "HomeServer"):
self.is_looping = False
self.pending_updates = False

self.command_handler = hs.get_tcp_replication()
self.command_handler = hs.get_replication_command_handler()

# Set of streams to replicate.
self.streams = self.command_handler.get_streams_to_replicate()
Expand Down
2 changes: 1 addition & 1 deletion synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ def get_read_marker_handler(self) -> ReadMarkerHandler:
return ReadMarkerHandler(self)

@cache_in_self
def get_tcp_replication(self) -> ReplicationCommandHandler:
def get_replication_command_handler(self) -> ReplicationCommandHandler:
return ReplicationCommandHandler(self)

@cache_in_self
Expand Down
4 changes: 2 additions & 2 deletions tests/replication/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def setUp(self):
self.connect_any_redis_attempts,
)

self.hs.get_tcp_replication().start_replication(self.hs)
self.hs.get_replication_command_handler().start_replication(self.hs)

# When we see a connection attempt to the master replication listener we
# automatically set up the connection. This is so that tests don't
Expand Down Expand Up @@ -375,7 +375,7 @@ def make_worker_hs(
)

if worker_hs.config.redis.redis_enabled:
worker_hs.get_tcp_replication().start_replication(worker_hs)
worker_hs.get_replication_command_handler().start_replication(worker_hs)

return worker_hs

Expand Down
2 changes: 1 addition & 1 deletion tests/replication/tcp/streams/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def test_backwards_stream_id(self):

# Manually send an old RDATA command, which should get dropped. This
# re-uses the row from above, but with an earlier stream token.
self.hs.get_tcp_replication().send_command(
self.hs.get_replication_command_handler().send_command(
RdataCommand("events", "master", 1, row)
)

Expand Down
2 changes: 1 addition & 1 deletion tests/replication/tcp/streams/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_reset(self):

# Reset the typing handler
self.hs.get_replication_streams()["typing"].last_token = 0
self.hs.get_tcp_replication()._streams["typing"].last_token = 0
self.hs.get_replication_command_handler()._streams["typing"].last_token = 0
typing._latest_room_serial = 0
typing._typing_stream_change_cache = StreamChangeCache(
"TypingStreamChangeCache", typing._latest_room_serial
Expand Down
2 changes: 1 addition & 1 deletion tests/replication/test_federation_ack.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_federation_ack_sent(self):
transport, rather than assuming that the implementation has a
ReplicationCommandHandler.
"""
rch = self.hs.get_tcp_replication()
rch = self.hs.get_replication_command_handler()

# wire up the ReplicationCommandHandler to a mock connection, which needs
# to implement IReplicationConnection. (Note that Mock doesn't understand
Expand Down