Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
fix: Push notifications for invite over federation
Browse files Browse the repository at this point in the history
Signed-off-by: Kateřina Churanová <[email protected]>
  • Loading branch information
Kateřina Churanová committed Sep 6, 2022
1 parent 9d2823a commit 0b9dd4e
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 33 deletions.
13 changes: 10 additions & 3 deletions synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from typing_extensions import Literal
from unpaddedbase64 import encode_base64

from synapse.api.constants import RelationTypes
from synapse.api.constants import Membership, RelationTypes
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
from synapse.types import JsonDict, RoomStreamToken
from synapse.util.caches import intern_dict
Expand Down Expand Up @@ -339,12 +339,19 @@ def event_id(self) -> str:
raise NotImplementedError()

@property
def membership(self) -> str:
return self.content["membership"]
def membership(self) -> Optional[str]:
return self.content.get("membership")

def is_state(self) -> bool:
return self.get_state_key() is not None

@property
def is_notifiable(self) -> bool:
return (
self.membership == Membership.INVITE
or not self.internal_metadata.is_outlier()
)

def get_state_key(self) -> Optional[str]:
"""Get the state key of this event, or None if it's not a state event"""
return self._dict.get("state_key")
Expand Down
6 changes: 6 additions & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,12 @@ async def on_invite_request(
)

context = EventContext.for_outlier(self._storage_controllers)

if event.is_notifiable:
await self._federation_event_handler.bulk_push_rule_evaluator.action_for_event_by_user(
event, context
)

await self._federation_event_handler.persist_events_and_notify(
event.room_id, [(event, context)]
)
Expand Down
15 changes: 8 additions & 7 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def __init__(self, hs: "HomeServer"):
self._event_creation_handler = hs.get_event_creation_handler()
self._event_auth_handler = hs.get_event_auth_handler()
self._message_handler = hs.get_message_handler()
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
self.bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
self._state_resolution_handler = hs.get_state_resolution_handler()
# avoid a circular dependency by deferring execution here
self._get_room_member_handler = hs.get_room_member_handler
Expand Down Expand Up @@ -2110,7 +2110,7 @@ async def _run_push_actions_and_persist_event(
min_depth,
)
else:
await self._bulk_push_rule_evaluator.action_for_event_by_user(
await self.bulk_push_rule_evaluator.action_for_event_by_user(
event, context
)

Expand Down Expand Up @@ -2153,6 +2153,7 @@ async def persist_events_and_notify(
if instance != self._instance_name:
# Limit the number of events sent over replication. We choose 200
# here as that is what we default to in `max_request_body_size(..)`
result = {}
try:
for batch in batch_iter(event_and_contexts, 200):
result = await self._send_events(
Expand All @@ -2173,28 +2174,28 @@ async def persist_events_and_notify(
# Note that this returns the events that were persisted, which may not be
# the same as were passed in if some were deduplicated due to transaction IDs.
(
events,
output_events,
max_stream_token,
) = await self._storage_controllers.persistence.persist_events(
event_and_contexts, backfilled=backfilled
)

if self._ephemeral_messages_enabled:
for event in events:
for event in output_events:
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)

if not backfilled: # Never notify for backfilled events
with start_active_span("notify_persisted_events"):
set_tag(
SynapseTags.RESULT_PREFIX + "event_ids",
str([ev.event_id for ev in events]),
str([ev.event_id for ev in output_events]),
)
set_tag(
SynapseTags.RESULT_PREFIX + "event_ids.length",
str(len(events)),
str(len(output_events)),
)
for event in events:
for event in output_events:
await self._notify_persisted_event(event, max_stream_token)

return max_stream_token.stream
Expand Down
10 changes: 7 additions & 3 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,11 @@ async def _get_rules_for_event(

async def _get_power_levels_and_sender_level(
self, event: EventBase, context: EventContext
) -> Tuple[dict, int]:
) -> Tuple[dict, Optional[int]]:
# There are no power levels and sender levels possible to get from outlier
if event.internal_metadata.is_outlier():
return {}, None

event_types = auth_types_for_event(event.room_version, event)
prev_state_ids = await context.get_prev_state_ids(
StateFilter.from_types(event_types)
Expand Down Expand Up @@ -258,8 +262,8 @@ async def action_for_event_by_user(
should increment the unread count, and insert the results into the
event_push_actions_staging table.
"""
if event.internal_metadata.is_outlier():
# This can happen due to out of band memberships
if not event.is_notifiable:
# Push rules for events that aren't notifiable can't be processed by this
return

count_as_unread = _should_count_as_unread(event, context)
Expand Down
16 changes: 8 additions & 8 deletions synapse/push/push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@
INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")


def _room_member_count(
ev: EventBase, condition: Mapping[str, Any], room_member_count: int
) -> bool:
def _room_member_count(condition: Mapping[str, Any], room_member_count: int) -> bool:
return _test_ineq_condition(condition, room_member_count)


def _sender_notification_permission(
ev: EventBase,
condition: Mapping[str, Any],
sender_power_level: int,
sender_power_level: Optional[int],
power_levels: Dict[str, Union[int, Dict[str, int]]],
) -> bool:
if sender_power_level is None:
return False

notif_level_key = condition.get("key")
if notif_level_key is None:
return False
Expand Down Expand Up @@ -129,7 +129,7 @@ def __init__(
self,
event: EventBase,
room_member_count: int,
sender_power_level: int,
sender_power_level: Optional[int],
power_levels: Dict[str, Union[int, Dict[str, int]]],
relations: Dict[str, Set[Tuple[str, str]]],
relations_match_enabled: bool,
Expand Down Expand Up @@ -198,10 +198,10 @@ def matches(
elif condition["kind"] == "contains_display_name":
return self._contains_display_name(display_name)
elif condition["kind"] == "room_member_count":
return _room_member_count(self._event, condition, self._room_member_count)
return _room_member_count(condition, self._room_member_count)
elif condition["kind"] == "sender_notification_permission":
return _sender_notification_permission(
self._event, condition, self._sender_power_level, self._power_levels
condition, self._sender_power_level, self._power_levels
)
elif (
condition["kind"] == "org.matrix.msc3772.relation_match"
Expand Down
10 changes: 6 additions & 4 deletions synapse/storage/controllers/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,16 +423,18 @@ async def enqueue(
for d in ret_vals:
replaced_events.update(d)

events = []
persisted_events = []
for event, _ in events_and_contexts:
existing_event_id = replaced_events.get(event.event_id)
if existing_event_id:
events.append(await self.main_store.get_event(existing_event_id))
persisted_events.append(
await self.main_store.get_event(existing_event_id)
)
else:
events.append(event)
persisted_events.append(event)

return (
events,
persisted_events,
self.main_store.get_room_max_token(),
)

Expand Down
14 changes: 6 additions & 8 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2180,13 +2180,11 @@ def _set_push_actions_for_event_and_users_txn(
appear in events_and_context.
"""

# Only non outlier events will have push actions associated with them,
# Only notifiable events will have push actions associated with them,
# so let's filter them out. (This makes joining large rooms faster, as
# these queries took seconds to process all the state events).
non_outlier_events = [
event
for event, _ in events_and_contexts
if not event.internal_metadata.is_outlier()
notifiable_events = [
event for event, _ in events_and_contexts if event.is_notifiable
]

sql = """
Expand All @@ -2199,7 +2197,7 @@ def _set_push_actions_for_event_and_users_txn(
WHERE event_id = ?
"""

if non_outlier_events:
if notifiable_events:
txn.execute_batch(
sql,
(
Expand All @@ -2209,12 +2207,12 @@ def _set_push_actions_for_event_and_users_txn(
event.depth,
event.event_id,
)
for event in non_outlier_events
for event in notifiable_events
),
)

room_to_event_ids: Dict[str, List[str]] = {}
for e in non_outlier_events:
for e in notifiable_events:
room_to_event_ids.setdefault(e.room_id, []).append(e.event_id)

for room_id, event_ids in room_to_event_ids.items():
Expand Down

0 comments on commit 0b9dd4e

Please sign in to comment.