diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 9b208668b615..46e768e35cb0 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -40,7 +40,7 @@ def __init__(self, hs): def handle_push_actions_for_event(self, event, context): with Measure(self.clock, "handle_push_actions_for_event"): bulk_evaluator = yield evaluator_for_event( - event, self.hs, self.store + event, self.hs, self.store, context.current_state ) actions_by_user = yield bulk_evaluator.action_for_event_by_user( diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 25f2fb9da4fd..8c59e59e037a 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -21,7 +21,7 @@ from .baserules import list_with_base_rules from .push_rule_evaluator import PushRuleEvaluatorForEvent -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, Membership from synapse.visibility import filter_events_for_clients @@ -72,28 +72,32 @@ def _get_rules(room_id, user_ids, store): @defer.inlineCallbacks -def evaluator_for_event(event, hs, store): +def evaluator_for_event(event, hs, store, current_state): room_id = event.room_id - - # users in the room who have pushers need to get push rules run because - # that's how their pushers work - users_with_pushers = yield store.get_users_with_pushers_in_room(room_id) - # We also will want to generate notifs for other people in the room so # their unread countss are correct in the event stream, but to avoid # generating them for bot / AS users etc, we only do so for people who've # sent a read receipt into the room. - all_in_room = yield store.get_users_in_room(room_id) - all_in_room = set(all_in_room) + all_in_room = set( + e.state_key for e in current_state.values() + if e.type == EventTypes.Member and e.membership == Membership.JOIN + ) - receipts = yield store.get_receipts_for_room(room_id, "m.read") + # users in the room who have pushers need to get push rules run because + # that's how their pushers work + if_users_with_pushers = yield store.get_if_users_have_pushers(all_in_room) + users_with_pushers = set( + uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher + ) + + users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id) # any users with pushers must be ours: they have pushers user_ids = set(users_with_pushers) - for r in receipts: - if hs.is_mine_id(r['user_id']) and r['user_id'] in all_in_room: - user_ids.add(r['user_id']) + for uid in users_with_receipts: + if hs.is_mine_id(uid) and uid in all_in_room: + user_ids.add(uid) # if this event is an invite event, we may need to run rules for the user # who's been invited, otherwise they won't get told they've been invited @@ -143,7 +147,10 @@ def action_for_event_by_user(self, event, current_state): self.store, user_tuples, [event], {event.event_id: current_state} ) - room_members = yield self.store.get_users_in_room(self.room_id) + room_members = set( + e.state_key for e in current_state.values() + if e.type == EventTypes.Member and e.membership == Membership.JOIN + ) evaluator = PushRuleEvaluatorForEvent(event, len(room_members)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4655669ba03c..2b3f79577bf1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -342,9 +342,6 @@ def _persist_event_txn(self, txn, event, context, current_state, backfilled=Fals txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) - txn.call_after( - self.get_users_with_pushers_in_room.invalidate, (event.room_id,) - ) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,)) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 9e8e2e29646a..39d5349eaa77 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -18,7 +18,7 @@ from canonicaljson import encode_canonical_json -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList import logging import simplejson as json @@ -135,19 +135,35 @@ def get_all_updated_pushers_txn(txn): "get_all_updated_pushers", get_all_updated_pushers_txn ) - @cachedInlineCallbacks(num_args=1) - def get_users_with_pushers_in_room(self, room_id): - users = yield self.get_users_in_room(room_id) - + @cachedInlineCallbacks(lru=True, num_args=1) + def get_if_user_has_pusher(self, user_id): result = yield self._simple_select_many_batch( + table='pushers', + keyvalues={ + 'user_name': 'user_id', + }, + retcol='user_name', + desc='get_if_user_has_pusher', + allow_none=True, + ) + + defer.returnValue(bool(result)) + + @cachedList(cached_method_name="get_if_user_has_pusher", + list_name="user_ids", num_args=1, inlineCallbacks=True) + def get_if_users_have_pushers(self, user_ids): + rows = yield self._simple_select_many_batch( table='pushers', column='user_name', - iterable=users, + iterable=user_ids, retcols=['user_name'], - desc='get_users_with_pushers_in_room' + desc='get_if_users_have_pushers' ) - defer.returnValue([r['user_name'] for r in result]) + result = {user_id: False for user_id in user_ids} + result.update({r['user_name']: True for r in rows}) + + defer.returnValue(result) @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, @@ -178,16 +194,16 @@ def f(txn): }, ) if newly_inserted: - # get_users_with_pushers_in_room only cares if the user has + # get_if_user_has_pusher only cares if the user has # at least *one* pusher. - txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) + txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,)) yield self.runInteraction("add_pusher", f) @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): def delete_pusher_txn(txn, stream_id): - txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) + txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,)) self._simple_delete_one_txn( txn, diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index fdcf28f3e1ba..964f30dff752 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -34,6 +34,26 @@ def __init__(self, hs): "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token() ) + @cachedInlineCallbacks() + def get_users_with_read_receipts_in_room(self, room_id): + receipts = yield self.get_receipts_for_room(room_id, "m.read") + defer.returnValue(set(r['user_id'] for r in receipts)) + + def _invalidate_get_users_with_receipts_in_room(self, room_id, receipt_type, + user_id): + if receipt_type != "m.read": + return + + # Returns an ObservableDeferred + res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None) + + if res and res.called and user_id in res.result: + # We'd only be adding to the set, so no point invalidating if the + # user is already there + return + + self.get_users_with_read_receipts_in_room.invalidate((room_id,)) + @cached(num_args=2) def get_receipts_for_room(self, room_id, receipt_type): return self._simple_select_list( @@ -228,6 +248,10 @@ def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, txn.call_after( self.get_receipts_for_room.invalidate, (room_id, receipt_type) ) + txn.call_after( + self._invalidate_get_users_with_receipts_in_room, + room_id, receipt_type, user_id, + ) txn.call_after( self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) @@ -373,6 +397,10 @@ def insert_graph_receipt_txn(self, txn, room_id, receipt_type, txn.call_after( self.get_receipts_for_room.invalidate, (room_id, receipt_type) ) + txn.call_after( + self._invalidate_get_users_with_receipts_in_room, + room_id, receipt_type, user_id, + ) txn.call_after( self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index face685ed2c7..41b395e07cd0 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -58,9 +58,6 @@ def _store_room_members_txn(self, txn, events, backfilled): txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) - txn.call_after( - self.get_users_with_pushers_in_room.invalidate, (event.room_id,) - ) txn.call_after( self._membership_stream_cache.entity_has_changed, event.state_key, event.internal_metadata.stream_ordering