From 0e8d78f6aa56020ffb948a4b2c6feadba2d16712 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Aug 2018 00:43:43 +0100 Subject: [PATCH 1/2] Logcontexts for replication command handlers Run the handlers for replication commands as background processes. This should improve the visibility in our metrics, and reduce the number of "running db transaction from sentinel context" warnings. Ideally it means converting the things that fire off deferreds into the night into things that actually return a Deferred when they are done. I've made a bit of a stab at this, but it will probably be leaky. --- synapse/app/appservice.py | 3 ++- synapse/app/federation_sender.py | 3 ++- synapse/app/pusher.py | 3 ++- synapse/app/synchrotron.py | 3 ++- synapse/app/user_dir.py | 3 ++- synapse/replication/tcp/client.py | 4 +-- synapse/replication/tcp/commands.py | 12 +++++++++ synapse/replication/tcp/protocol.py | 42 ++++++++++++++++++++--------- 8 files changed, 53 insertions(+), 20 deletions(-) diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 9a37384fb7b5..3348a8ec6dbc 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -117,8 +117,9 @@ def __init__(self, hs): super(ASReplicationHandler, self).__init__(hs.get_datastore()) self.appservice_handler = hs.get_application_service_handler() + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(ASReplicationHandler, self).on_rdata(stream_name, token, rows) + yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows) if stream_name == "events": max_stream_id = self.store.get_room_max_stream_ordering() diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 7a4310ca1828..d59007099bec 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -144,8 +144,9 @@ def __init__(self, hs): super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore()) self.send_handler = FederationSenderHandler(hs, self) + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(FederationSenderReplicationHandler, self).on_rdata( + yield super(FederationSenderReplicationHandler, self).on_rdata( stream_name, token, rows ) self.send_handler.process_replication_rows(stream_name, token, rows) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 9295a51d5b74..aa0938c376d6 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -148,8 +148,9 @@ def __init__(self, hs): self.pusher_pool = hs.get_pusherpool() + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) + yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) run_in_background(self.poke_pushers, stream_name, token, rows) @defer.inlineCallbacks diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index e201f18efdc5..39c7cbc1bae3 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -332,8 +332,9 @@ def __init__(self, hs): self.presence_handler = hs.get_presence_handler() self.notifier = hs.get_notifier() + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) + yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) run_in_background(self.process_and_notify, stream_name, token, rows) def get_streams_to_replicate(self): diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index cb78de88340a..1388a42b59aa 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -169,8 +169,9 @@ def __init__(self, hs): super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore()) self.user_directory = hs.get_user_directory_handler() + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(UserDirectoryReplicationHandler, self).on_rdata( + yield super(UserDirectoryReplicationHandler, self).on_rdata( stream_name, token, rows ) if stream_name == "current_state_deltas": diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 970e94313e31..cbe964581711 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -107,7 +107,7 @@ def on_rdata(self, stream_name, token, rows): Can be overriden in subclasses to handle more. """ logger.info("Received rdata %s -> %s", stream_name, token) - self.store.process_replication_rows(stream_name, token, rows) + return self.store.process_replication_rows(stream_name, token, rows) def on_position(self, stream_name, token): """Called when we get new position data. By default this just pokes @@ -115,7 +115,7 @@ def on_position(self, stream_name, token): Can be overriden in subclasses to handle more. """ - self.store.process_replication_rows(stream_name, token, []) + return self.store.process_replication_rows(stream_name, token, []) def on_sync(self, data): """When we received a SYNC we wake up any deferreds that were waiting diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index f3908df642b5..327556f6a1e6 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -59,6 +59,12 @@ def to_line(self): """ return self.data + def get_logcontext_id(self): + """Get a suitable string for the logcontext when processing this command""" + + # by default, we just use the command name. + return self.NAME + class ServerCommand(Command): """Sent by the server on new connection and includes the server_name. @@ -116,6 +122,9 @@ def to_line(self): _json_encoder.encode(self.row), )) + def get_logcontext_id(self): + return "RDATA-" + self.stream_name + class PositionCommand(Command): """Sent by the client to tell the client the stream postition without @@ -190,6 +199,9 @@ def from_line(cls, line): def to_line(self): return " ".join((self.stream_name, str(self.token),)) + def get_logcontext_id(self): + return "REPLICATE-" + self.stream_name + class UserSyncCommand(Command): """Sent by the client to inform the server that a user has started or diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index dec5ac091326..74e892c10416 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -63,6 +63,8 @@ from twisted.python.failure import Failure from synapse.metrics import LaterGauge +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.stringutils import random_string from .commands import ( @@ -222,7 +224,11 @@ def lineReceived(self, line): # Now lets try and call on_ function try: - getattr(self, "on_%s" % (cmd_name,))(cmd) + run_as_background_process( + "replication-" + cmd.get_logcontext_id(), + getattr(self, "on_%s" % (cmd_name,)), + cmd, + ) except Exception: logger.exception("[%s] Failed to handle line: %r", self.id(), line) @@ -387,7 +393,7 @@ def on_NAME(self, cmd): self.name = cmd.data def on_USER_SYNC(self, cmd): - self.streamer.on_user_sync( + return self.streamer.on_user_sync( self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms, ) @@ -397,22 +403,33 @@ def on_REPLICATE(self, cmd): if stream_name == "ALL": # Subscribe to all streams we're publishing to. - for stream in iterkeys(self.streamer.streams_by_name): - self.subscribe_to_stream(stream, token) + deferreds = [ + run_in_background( + self.subscribe_to_stream, + stream, token, + ) + for stream in iterkeys(self.streamer.streams_by_name) + ] + + return make_deferred_yieldable( + defer.gatherResults(deferreds, consumeErrors=True) + ) else: - self.subscribe_to_stream(stream_name, token) + return self.subscribe_to_stream(stream_name, token) def on_FEDERATION_ACK(self, cmd): - self.streamer.federation_ack(cmd.token) + return self.streamer.federation_ack(cmd.token) def on_REMOVE_PUSHER(self, cmd): - self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id) + return self.streamer.on_remove_pusher( + cmd.app_id, cmd.push_key, cmd.user_id, + ) def on_INVALIDATE_CACHE(self, cmd): - self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) + return self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) def on_USER_IP(self, cmd): - self.streamer.on_user_ip( + return self.streamer.on_user_ip( cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id, cmd.last_seen, ) @@ -542,14 +559,13 @@ def on_RDATA(self, cmd): # Check if this is the last of a batch of updates rows = self.pending_batches.pop(stream_name, []) rows.append(row) - - self.handler.on_rdata(stream_name, cmd.token, rows) + return self.handler.on_rdata(stream_name, cmd.token, rows) def on_POSITION(self, cmd): - self.handler.on_position(cmd.stream_name, cmd.token) + return self.handler.on_position(cmd.stream_name, cmd.token) def on_SYNC(self, cmd): - self.handler.on_sync(cmd.data) + return self.handler.on_sync(cmd.data) def replicate(self, stream_name, token): """Send the subscription request to the server From d9efd87d55fb7766ba332bedbecd5efe1ca7067b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Aug 2018 00:49:22 +0100 Subject: [PATCH 2/2] changelog --- changelog.d/3709.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3709.misc diff --git a/changelog.d/3709.misc b/changelog.d/3709.misc new file mode 100644 index 000000000000..bbda357d4481 --- /dev/null +++ b/changelog.d/3709.misc @@ -0,0 +1 @@ +Logcontexts for replication command handlers