From 66f7dc8c87a6c585b4d456c5b2c4999452fc82c2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Aug 2018 00:32:39 +0100 Subject: [PATCH 1/2] Fix logcontexts for running pushers First of all, avoid resetting the logcontext before running the pushers, to fix the "Starting db txn 'get_all_updated_receipts' from sentinel context" warning. Instead, give them their own "background process" logcontexts. --- synapse/app/pusher.py | 4 ++-- synapse/handlers/federation.py | 3 +-- synapse/handlers/message.py | 7 ++----- synapse/handlers/receipts.py | 18 ++++++++---------- synapse/push/pusherpool.py | 17 +++++++++++++++-- 5 files changed, 28 insertions(+), 21 deletions(-) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 9295a51d5b74..0089a7c64f54 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -162,11 +162,11 @@ def poke_pushers(self, stream_name, token, rows): else: yield self.start_pusher(row.user_id, row.app_id, row.pushkey) elif stream_name == "events": - yield self.pusher_pool.on_new_notifications( + self.pusher_pool.on_new_notifications( token, token, ) elif stream_name == "receipts": - yield self.pusher_pool.on_new_receipts( + self.pusher_pool.on_new_receipts( token, token, set(row.room_id for row in rows) ) except Exception: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f38b393e4ab4..3dd107a2850b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2386,8 +2386,7 @@ def _notify_persisted_event(self, event, max_stream_id): extra_users=extra_users ) - logcontext.run_in_background( - self.pusher_pool.on_new_notifications, + self.pusher_pool.on_new_notifications( event_stream_id, max_stream_id, ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 893c9bcdc4db..f21d740968bf 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -774,11 +774,8 @@ def is_inviter_member_event(e): event, context=context ) - # this intentionally does not yield: we don't care about the result - # and don't need to wait for it. - run_in_background( - self.pusher_pool.on_new_notifications, - event_stream_id, max_stream_id + self.pusher_pool.on_new_notifications( + event_stream_id, max_stream_id, ) def _notify(): diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index cb905a390343..a6f3181f099b 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -18,7 +18,6 @@ from synapse.types import get_domain_from_id from synapse.util import logcontext -from synapse.util.logcontext import PreserveLoggingContext from ._base import BaseHandler @@ -116,16 +115,15 @@ def _handle_new_receipts(self, receipts): affected_room_ids = list(set([r["room_id"] for r in receipts])) - with PreserveLoggingContext(): - self.notifier.on_new_event( - "receipt_key", max_batch_id, rooms=affected_room_ids - ) - # Note that the min here shouldn't be relied upon to be accurate. - self.hs.get_pusherpool().on_new_receipts( - min_batch_id, max_batch_id, affected_room_ids - ) + self.notifier.on_new_event( + "receipt_key", max_batch_id, rooms=affected_room_ids + ) + # Note that the min here shouldn't be relied upon to be accurate. + self.hs.get_pusherpool().on_new_receipts( + min_batch_id, max_batch_id, affected_room_ids, + ) - defer.returnValue(True) + defer.returnValue(True) @logcontext.preserve_fn # caller should not yield on this @defer.inlineCallbacks diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 36bb5bbc6543..9f7d5ef217db 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -18,6 +18,7 @@ from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push.pusher import PusherFactory from synapse.util.logcontext import make_deferred_yieldable, run_in_background @@ -122,8 +123,14 @@ def remove_pushers_by_access_token(self, user_id, access_tokens): p['app_id'], p['pushkey'], p['user_name'], ) - @defer.inlineCallbacks def on_new_notifications(self, min_stream_id, max_stream_id): + run_as_background_process( + "on_new_notifications", + self._on_new_notifications, min_stream_id, max_stream_id, + ) + + @defer.inlineCallbacks + def _on_new_notifications(self, min_stream_id, max_stream_id): try: users_affected = yield self.store.get_push_action_users_in_range( min_stream_id, max_stream_id @@ -147,8 +154,14 @@ def on_new_notifications(self, min_stream_id, max_stream_id): except Exception: logger.exception("Exception in pusher on_new_notifications") - @defer.inlineCallbacks def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): + run_as_background_process( + "on_new_receipts", + self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids, + ) + + @defer.inlineCallbacks + def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): try: # Need to subtract 1 from the minimum because the lower bound here # is not inclusive From 4c9da1440f9ab26b79e5efc2a0f774916f18aeba Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Aug 2018 00:50:36 +0100 Subject: [PATCH 2/2] changelog --- changelog.d/3710.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3710.bugfix diff --git a/changelog.d/3710.bugfix b/changelog.d/3710.bugfix new file mode 100644 index 000000000000..c28e17766770 --- /dev/null +++ b/changelog.d/3710.bugfix @@ -0,0 +1 @@ +Fix "Starting db txn 'get_all_updated_receipts' from sentinel context" warning \ No newline at end of file