From 6b2eab6b3f3582abce85d4af5d48b08a92489c04 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Jul 2020 23:37:03 +0100 Subject: [PATCH 01/12] Allow command-processing methods to be synchronous ... and only start a background process if they are async. --- synapse/replication/tcp/protocol.py | 42 +++++++++++++++++++---------- synapse/replication/tcp/redis.py | 40 +++++++++++++-------------- 2 files changed, 48 insertions(+), 34 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index ca47f5cc88f1..566afe783b7a 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -124,6 +124,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): On receiving a new command it calls `on_` with the parsed command before delegating to `ReplicationCommandHandler.on_`. + `ReplicationCommandHandler.on_` can optionally return a coroutine; + if so, that will get run as a background process. It also sends `PING` periodically, and correctly times out remote connections (if they send a `PING` command) @@ -232,18 +234,17 @@ def lineReceived(self, line: bytes): tcp_inbound_commands_counter.labels(cmd.NAME, self.name).inc() - # Now lets try and call on_ function - run_as_background_process( - "replication-" + cmd.get_logcontext_id(), self.handle_command, cmd - ) + self.handle_command(cmd) - async def handle_command(self, cmd: Command): + def handle_command(self, cmd: Command) -> None: """Handle a command we have received over the replication stream. First calls `self.on_` if it exists, then calls - `self.command_handler.on_` if it exists. This allows for - protocol level handling of commands (e.g. PINGs), before delegating to - the handler. + `self.command_handler.on_` if it exists (which can optionally + return an Awaitable). + + This allows for protocol level handling of commands (e.g. PINGs), before + delegating to the handler. Args: cmd: received command @@ -254,13 +255,26 @@ async def handle_command(self, cmd: Command): # specific handling. cmd_func = getattr(self, "on_%s" % (cmd.NAME,), None) if cmd_func: - await cmd_func(cmd) + cmd_func(cmd) handled = True # Then call out to the handler. cmd_func = getattr(self.command_handler, "on_%s" % (cmd.NAME,), None) if cmd_func: - await cmd_func(self, cmd) + res = cmd_func(self, cmd) + + # the handler might be a coroutine: fire it off as a background process + # if so. + + if hasattr(res, "__await__"): + + async def handle_command(): + await res + + run_as_background_process( + "replication-" + cmd.get_logcontext_id(), handle_command + ) + handled = True if not handled: @@ -336,10 +350,10 @@ def _send_pending_commands(self): for cmd in pending: self.send_command(cmd) - async def on_PING(self, line): + def on_PING(self, line): self.received_ping = True - async def on_ERROR(self, cmd): + def on_ERROR(self, cmd): logger.error("[%s] Remote reported error: %r", self.id(), cmd.data) def pauseProducing(self): @@ -431,7 +445,7 @@ def connectionMade(self): self.send_command(ServerCommand(self.server_name)) super().connectionMade() - async def on_NAME(self, cmd): + def on_NAME(self, cmd): logger.info("[%s] Renamed to %r", self.id(), cmd.data) self.name = cmd.data @@ -460,7 +474,7 @@ def connectionMade(self): # Once we've connected subscribe to the necessary streams self.replicate() - async def on_SERVER(self, cmd): + def on_SERVER(self, cmd): if cmd.data != self.server_name: logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data) self.send_error("Wrong remote") diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 0a7e7f67be74..f6250bf1f7d3 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -109,36 +109,36 @@ def messageReceived(self, pattern: str, channel: str, message: str): # remote instances. tcp_inbound_commands_counter.labels(cmd.NAME, "redis").inc() - # Now lets try and call on_ function - run_as_background_process( - "replication-" + cmd.get_logcontext_id(), self.handle_command, cmd - ) + self.handle_command(cmd) - async def handle_command(self, cmd: Command): + def handle_command(self, cmd: Command) -> None: """Handle a command we have received over the replication stream. - By default delegates to on_, which should return an awaitable. + Delegates to `self.handler.on_` (which can optionally return an + Awaitable). Args: cmd: received command """ - handled = False - - # First call any command handlers on this instance. These are for redis - # specific handling. - cmd_func = getattr(self, "on_%s" % (cmd.NAME,), None) - if cmd_func: - await cmd_func(cmd) - handled = True - # Then call out to the handler. cmd_func = getattr(self.handler, "on_%s" % (cmd.NAME,), None) - if cmd_func: - await cmd_func(self, cmd) - handled = True - - if not handled: + if not cmd_func: logger.warning("Unhandled command: %r", cmd) + return + + res = cmd_func(self, cmd) + + # the handler might be a coroutine: fire it off as a background process + # if so. + + if hasattr(res, "__await__"): + + async def handle_command(): + await res + + run_as_background_process( + "replication-" + cmd.get_logcontext_id(), handle_command + ) def connectionLost(self, reason): logger.info("Lost connection to redis") From ebdc8a548ba833234c615c981f4e7e9b4a115bcb Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Jul 2020 23:43:06 +0100 Subject: [PATCH 02/12] Make a bunch of command handler methods synchronous --- synapse/replication/tcp/handler.py | 73 ++++++++++++++++++------------ 1 file changed, 44 insertions(+), 29 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index f88e0a2e404e..01b91520b234 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -16,6 +16,7 @@ import logging from typing import ( Any, + Awaitable, Dict, Iterable, Iterator, @@ -301,7 +302,7 @@ def get_streams_to_replicate(self) -> List[Stream]: """ return self._streams_to_replicate - async def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand): + def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand): self.send_positions_to_connection(conn) def send_positions_to_connection(self, conn: AbstractConnection): @@ -320,55 +321,71 @@ def send_positions_to_connection(self, conn: AbstractConnection): ) ) - async def on_USER_SYNC(self, conn: AbstractConnection, cmd: UserSyncCommand): + async def on_USER_SYNC( + self, conn: AbstractConnection, cmd: UserSyncCommand + ) -> Optional[Awaitable[None]]: user_sync_counter.inc() if self._is_master: - await self._presence_handler.update_external_syncs_row( + return self._presence_handler.update_external_syncs_row( cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms ) + else: + return None - async def on_CLEAR_USER_SYNC( + def on_CLEAR_USER_SYNC( self, conn: AbstractConnection, cmd: ClearUserSyncsCommand - ): + ) -> Optional[Awaitable[None]]: if self._is_master: - await self._presence_handler.update_external_syncs_clear(cmd.instance_id) + return self._presence_handler.update_external_syncs_clear(cmd.instance_id) + else: + return None - async def on_FEDERATION_ACK( - self, conn: AbstractConnection, cmd: FederationAckCommand - ): + def on_FEDERATION_ACK(self, conn: AbstractConnection, cmd: FederationAckCommand): federation_ack_counter.inc() if self._federation_sender: self._federation_sender.federation_ack(cmd.instance_name, cmd.token) - async def on_REMOVE_PUSHER( + def on_REMOVE_PUSHER( self, conn: AbstractConnection, cmd: RemovePusherCommand - ): + ) -> Optional[Awaitable[None]]: remove_pusher_counter.inc() if self._is_master: - await self._store.delete_pusher_by_app_id_pushkey_user_id( - app_id=cmd.app_id, pushkey=cmd.push_key, user_id=cmd.user_id - ) + return self._handle_remove_pusher(cmd) + else: + return None - self._notifier.on_new_replication_data() + async def _handle_remove_pusher(self, cmd: RemovePusherCommand): + await self._store.delete_pusher_by_app_id_pushkey_user_id( + app_id=cmd.app_id, pushkey=cmd.push_key, user_id=cmd.user_id + ) + + self._notifier.on_new_replication_data() - async def on_USER_IP(self, conn: AbstractConnection, cmd: UserIpCommand): + def on_USER_IP( + self, conn: AbstractConnection, cmd: UserIpCommand + ) -> Optional[Awaitable[None]]: user_ip_cache_counter.inc() if self._is_master: - await self._store.insert_client_ip( - cmd.user_id, - cmd.access_token, - cmd.ip, - cmd.user_agent, - cmd.device_id, - cmd.last_seen, - ) + return self._handle_user_ip(cmd) + else: + return None + + async def _handle_user_ip(self, cmd: UserIpCommand): + await self._store.insert_client_ip( + cmd.user_id, + cmd.access_token, + cmd.ip, + cmd.user_agent, + cmd.device_id, + cmd.last_seen, + ) - if self._server_notices_sender: - await self._server_notices_sender.on_user_ip(cmd.user_id) + assert self._server_notices_sender is not None + await self._server_notices_sender.on_user_ip(cmd.user_id) async def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand): if cmd.instance_name == self._instance_name: @@ -528,9 +545,7 @@ async def _process_position( self._streams_by_connection.setdefault(conn, set()).add(stream_name) - async def on_REMOTE_SERVER_UP( - self, conn: AbstractConnection, cmd: RemoteServerUpCommand - ): + def on_REMOTE_SERVER_UP(self, conn: AbstractConnection, cmd: RemoteServerUpCommand): """"Called when get a new REMOTE_SERVER_UP command.""" self._replication_data_handler.on_remote_server_up(cmd.data) From c0ee62c8feb290c88c01dba1100510c2c9d98b11 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Jul 2020 23:45:22 +0100 Subject: [PATCH 03/12] make on_POSTITION and on_RDATA synchronous fire up a background process only when we start processing the queue. --- synapse/replication/tcp/handler.py | 40 +++++++++++++++--------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 01b91520b234..cab1b683a60e 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -34,6 +34,7 @@ from twisted.internet.protocol import ReconnectingClientFactory from synapse.metrics import LaterGauge +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.client import DirectTcpReplicationClientFactory from synapse.replication.tcp.commands import ( ClearUserSyncsCommand, @@ -155,7 +156,7 @@ def __init__(self, hs): # When POSITION or RDATA commands arrive, we stick them in a queue and process # them in order in a separate background process. - # the streams which are currently being processed by _unsafe_process_stream + # the streams which are currently being processed by _unsafe_process_queue self._processing_streams = set() # type: Set[str] # for each stream, a queue of commands that are awaiting processing, and the @@ -188,7 +189,7 @@ def __init__(self, hs): if self._is_master: self._server_notices_sender = hs.get_server_notices_sender() - async def _add_command_to_stream_queue( + def _add_command_to_stream_queue( self, conn: AbstractConnection, cmd: Union[RdataCommand, PositionCommand] ) -> None: """Queue the given received command for processing @@ -202,33 +203,32 @@ async def _add_command_to_stream_queue( logger.error("Got %s for unknown stream: %s", cmd.NAME, stream_name) return - # if we're already processing this stream, stick the new command in the - # queue, and we're done. + queue.append((cmd, conn)) + + # if we're already processing this stream, there's nothing more to do: + # the new entry on the queue will get picked up in due course if stream_name in self._processing_streams: - queue.append((cmd, conn)) return - # otherwise, process the new command. + # fire off a background process to start processing the queue. + run_as_background_process( + "process-replication-data", self._unsafe_process_queue, conn, cmd + ) - # arguably we should start off a new background process here, but nothing - # will be too upset if we don't return for ages, so let's save the overhead - # and use the existing logcontext. + async def _unsafe_process_queue(self, stream_name: str): + """Processes the command queue for the given stream, until it is empty + Does not check if there is already a thread processing the queue, hence "unsafe" + """ self._processing_streams.add(stream_name) try: - # might as well skip the queue for this one, since it must be empty - assert not queue - await self._process_command(cmd, conn, stream_name) - - # now process any other commands that have built up while we were - # dealing with that one. + queue = self._command_queues_by_stream.get(stream_name) while queue: cmd, conn = queue.popleft() try: await self._process_command(cmd, conn, stream_name) except Exception: logger.exception("Failed to handle command %s", cmd) - finally: self._processing_streams.discard(stream_name) @@ -387,7 +387,7 @@ async def _handle_user_ip(self, cmd: UserIpCommand): assert self._server_notices_sender is not None await self._server_notices_sender.on_user_ip(cmd.user_id) - async def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand): + def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand): if cmd.instance_name == self._instance_name: # Ignore RDATA that are just our own echoes return @@ -401,7 +401,7 @@ async def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand): # 2. so we don't race with getting a POSITION command and fetching # missing RDATA. - await self._add_command_to_stream_queue(conn, cmd) + self._add_command_to_stream_queue(conn, cmd) async def _process_rdata( self, stream_name: str, conn: AbstractConnection, cmd: RdataCommand @@ -478,14 +478,14 @@ async def on_rdata( stream_name, instance_name, token, rows ) - async def on_POSITION(self, conn: AbstractConnection, cmd: PositionCommand): + def on_POSITION(self, conn: AbstractConnection, cmd: PositionCommand): if cmd.instance_name == self._instance_name: # Ignore POSITION that are just our own echoes return logger.info("Handling '%s %s'", cmd.NAME, cmd.to_line()) - await self._add_command_to_stream_queue(conn, cmd) + self._add_command_to_stream_queue(conn, cmd) async def _process_position( self, stream_name: str, conn: AbstractConnection, cmd: PositionCommand From ebee62dc00013e4c0a8644a96e7252bc6f6583a2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Jul 2020 23:46:14 +0100 Subject: [PATCH 04/12] newsfiles --- changelog.d/7861.misc | 2 +- changelog.d/7876.misc | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/7876.misc diff --git a/changelog.d/7861.misc b/changelog.d/7861.misc index ada616c62ffa..b9d591444331 100644 --- a/changelog.d/7861.misc +++ b/changelog.d/7861.misc @@ -1 +1 @@ -Optimise queueing of inbound replication commands. +Optimise handling of inbound replication commands. diff --git a/changelog.d/7876.misc b/changelog.d/7876.misc new file mode 100644 index 000000000000..b9d591444331 --- /dev/null +++ b/changelog.d/7876.misc @@ -0,0 +1 @@ +Optimise handling of inbound replication commands. From 56d65770b9646c9f000d46caf9284a5156b96ba3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 27 Jul 2020 10:19:41 +0100 Subject: [PATCH 05/12] address review comments --- synapse/replication/tcp/handler.py | 2 ++ synapse/replication/tcp/protocol.py | 9 +++------ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index cab1b683a60e..6077879ba8e0 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -220,6 +220,8 @@ async def _unsafe_process_queue(self, stream_name: str): Does not check if there is already a thread processing the queue, hence "unsafe" """ + assert stream_name not in self._processing_streams + self._processing_streams.add(stream_name) try: queue = self._command_queues_by_stream.get(stream_name) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 566afe783b7a..92cf215f6611 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -50,6 +50,7 @@ import fcntl import logging import struct +from inspect import isawaitable from typing import TYPE_CHECKING, List from prometheus_client import Counter @@ -266,13 +267,9 @@ def handle_command(self, cmd: Command) -> None: # the handler might be a coroutine: fire it off as a background process # if so. - if hasattr(res, "__await__"): - - async def handle_command(): - await res - + if isawaitable(res): run_as_background_process( - "replication-" + cmd.get_logcontext_id(), handle_command + "replication-" + cmd.get_logcontext_id(), lambda: res ) handled = True From 4d697f127e74d74e585791cb23f672ad36b5aa31 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 27 Jul 2020 13:09:27 +0100 Subject: [PATCH 06/12] fix _unsafe_process_queue invocation --- synapse/replication/tcp/handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 6077879ba8e0..e1a308c1ae12 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -212,7 +212,7 @@ def _add_command_to_stream_queue( # fire off a background process to start processing the queue. run_as_background_process( - "process-replication-data", self._unsafe_process_queue, conn, cmd + "process-replication-data", self._unsafe_process_queue, stream_name ) async def _unsafe_process_queue(self, stream_name: str): From f6b3762f4e0ef5121956ec0b0f461a63d2c6443e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 27 Jul 2020 15:40:10 +0100 Subject: [PATCH 07/12] give replication stream a request name, for better logging --- synapse/replication/tcp/protocol.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index cb68e5c6323e..035092389843 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -169,9 +169,9 @@ def __init__(self, clock: Clock, handler: "ReplicationCommandHandler"): # a logcontext which we use for processing incoming commands. We declare it as a # background process so that the CPU stats get reported to prometheus. - self._logging_context = BackgroundProcessLoggingContext( - "replication_command_handler-%s" % self.conn_id - ) + ctx_name = "replication-conn-%s" % self.conn_id + self._logging_context = BackgroundProcessLoggingContext(ctx_name) + self._logging_context.request = ctx_name def connectionMade(self): logger.info("[%s] Connection established", self.id()) From 5759de925c443c485c0e81ba5150db98216580d3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 27 Jul 2020 15:49:38 +0100 Subject: [PATCH 08/12] fix on_USER_SYNC --- synapse/replication/tcp/handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 29d2b9e6a707..1c303f3a46d8 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -321,7 +321,7 @@ def send_positions_to_connection(self, conn: AbstractConnection): ) ) - async def on_USER_SYNC( + def on_USER_SYNC( self, conn: AbstractConnection, cmd: UserSyncCommand ) -> Optional[Awaitable[None]]: user_sync_counter.inc() From fbdb15fdd2e450e17a95a70e3def2161435dba70 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 27 Jul 2020 16:02:37 +0100 Subject: [PATCH 09/12] remove another rredundant function instance --- synapse/replication/tcp/redis.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index a75bb5a0c13b..7707bbb21093 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -147,12 +147,8 @@ def handle_command(self, cmd: Command) -> None: # if so. if hasattr(res, "__await__"): - - async def handle_command(): - await res - run_as_background_process( - "replication-" + cmd.get_logcontext_id(), handle_command + "replication-" + cmd.get_logcontext_id(), lambda: res ) def connectionLost(self, reason): From cc5c505f8805471e7f7cc3075d8207eda9707614 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 27 Jul 2020 17:42:33 +0100 Subject: [PATCH 10/12] Update synapse/replication/tcp/redis.py Co-authored-by: Erik Johnston --- synapse/replication/tcp/redis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 7707bbb21093..f98420ce3e37 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -146,7 +146,7 @@ def handle_command(self, cmd: Command) -> None: # the handler might be a coroutine: fire it off as a background process # if so. - if hasattr(res, "__await__"): + if isawaitable(res): run_as_background_process( "replication-" + cmd.get_logcontext_id(), lambda: res ) From ed550cc6070efa6a2101dabe857659b584b72062 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 27 Jul 2020 17:55:50 +0100 Subject: [PATCH 11/12] update newsfiles --- changelog.d/7861.misc | 2 +- changelog.d/7876.bugfix | 1 + changelog.d/7876.misc | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 changelog.d/7876.bugfix diff --git a/changelog.d/7861.misc b/changelog.d/7861.misc index b9d591444331..ada616c62ffa 100644 --- a/changelog.d/7861.misc +++ b/changelog.d/7861.misc @@ -1 +1 @@ -Optimise handling of inbound replication commands. +Optimise queueing of inbound replication commands. diff --git a/changelog.d/7876.bugfix b/changelog.d/7876.bugfix new file mode 100644 index 000000000000..4ba2fadd5879 --- /dev/null +++ b/changelog.d/7876.bugfix @@ -0,0 +1 @@ +Fix an `AssertionError` exception introduced in v1.18.0rc1. diff --git a/changelog.d/7876.misc b/changelog.d/7876.misc index b9d591444331..5c78a158cdee 100644 --- a/changelog.d/7876.misc +++ b/changelog.d/7876.misc @@ -1 +1 @@ -Optimise handling of inbound replication commands. +Further optimise queueing of inbound replication commands. From c278cb3c22d7c6109c119fc6a0c1144f19ce79a2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 27 Jul 2020 17:56:17 +0100 Subject: [PATCH 12/12] fix broken import --- synapse/replication/tcp/redis.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index f98420ce3e37..f225e533de5b 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from inspect import isawaitable from typing import TYPE_CHECKING import txredisapi