Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor and optimise filter_events_for_server #3541

Merged
merged 6 commits into from
Jul 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3541.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimisation to make handling incoming federation requests more efficient.
144 changes: 6 additions & 138 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
add_hashes_and_signatures,
compute_event_signature,
)
from synapse.events.utils import prune_event
from synapse.events.validator import EventValidator
from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
Expand All @@ -52,8 +51,8 @@
from synapse.util.distributor import user_joined_room
from synapse.util.frozenutils import unfreeze
from synapse.util.logutils import log_function
from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server

from ._base import BaseHandler

Expand Down Expand Up @@ -501,137 +500,6 @@ def _process_received_pdu(self, origin, pdu, state, auth_chain):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)

@measure_func("_filter_events_for_server")
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
"""Filter the given events for the given server, redacting those the
server can't see.

Assumes the server is currently in the room.

Returns
list[FrozenEvent]
"""
# First lets check to see if all the events have a history visibility
# of "shared" or "world_readable". If thats the case then we don't
# need to check membership (as we know the server is in the room).
event_to_state_ids = yield self.store.get_state_ids_for_events(
frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
)
)

visibility_ids = set()
for sids in event_to_state_ids.itervalues():
hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
if hist:
visibility_ids.add(hist)

# If we failed to find any history visibility events then the default
# is "shared" visiblity.
if not visibility_ids:
defer.returnValue(events)

event_map = yield self.store.get_events(visibility_ids)
all_open = all(
e.content.get("history_visibility") in (None, "shared", "world_readable")
for e in event_map.itervalues()
)

if all_open:
defer.returnValue(events)

# Ok, so we're dealing with events that have non-trivial visibility
# rules, so we need to also get the memberships of the room.

event_to_state_ids = yield self.store.get_state_ids_for_events(
frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, None),
)
)

# We only want to pull out member events that correspond to the
# server's domain.

def check_match(id):
try:
return server_name == get_domain_from_id(id)
except Exception:
return False

# Parses mapping `event_id -> (type, state_key) -> state event_id`
# to get all state ids that we're interested in.
event_map = yield self.store.get_events([
e_id
for key_to_eid in list(event_to_state_ids.values())
for key, e_id in key_to_eid.items()
if key[0] != EventTypes.Member or check_match(key[1])
])

event_to_state = {
e_id: {
key: event_map[inner_e_id]
for key, inner_e_id in key_to_eid.iteritems()
if inner_e_id in event_map
}
for e_id, key_to_eid in event_to_state_ids.iteritems()
}

erased_senders = yield self.store.are_users_erased(
e.sender for e in events,
)

def redact_disallowed(event, state):
# if the sender has been gdpr17ed, always return a redacted
# copy of the event.
if erased_senders[event.sender]:
logger.info(
"Sender of %s has been erased, redacting",
event.event_id,
)
return prune_event(event)

if not state:
return event

history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
if history:
visibility = history.content.get("history_visibility", "shared")
if visibility in ["invited", "joined"]:
# We now loop through all state events looking for
# membership states for the requesting server to determine
# if the server is either in the room or has been invited
# into the room.
for ev in state.itervalues():
if ev.type != EventTypes.Member:
continue
try:
domain = get_domain_from_id(ev.state_key)
except Exception:
continue

if domain != server_name:
continue

memtype = ev.membership
if memtype == Membership.JOIN:
return event
elif memtype == Membership.INVITE:
if visibility == "invited":
return event
else:
return prune_event(event)

return event

defer.returnValue([
redact_disallowed(e, event_to_state[e.event_id])
for e in events
])

@log_function
@defer.inlineCallbacks
def backfill(self, dest, room_id, limit, extremities):
Expand Down Expand Up @@ -1558,7 +1426,7 @@ def on_backfill_request(self, origin, room_id, pdu_list, limit):
limit
)

events = yield self._filter_events_for_server(origin, room_id, events)
events = yield filter_events_for_server(self.store, origin, events)

defer.returnValue(events)

Expand Down Expand Up @@ -1605,8 +1473,8 @@ def get_persisted_pdu(self, origin, event_id):
if not in_room:
raise AuthError(403, "Host not in room.")

events = yield self._filter_events_for_server(
origin, event.room_id, [event]
events = yield filter_events_for_server(
self.store, origin, [event],
)
event = events[0]
defer.returnValue(event)
Expand Down Expand Up @@ -1896,8 +1764,8 @@ def on_get_missing_events(self, origin, room_id, earliest_events,
min_depth=min_depth,
)

missing_events = yield self._filter_events_for_server(
origin, room_id, missing_events,
missing_events = yield filter_events_for_server(
self.store, origin, missing_events,
)

defer.returnValue(missing_events)
Expand Down
141 changes: 141 additions & 0 deletions synapse/visibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
import logging
import operator

import six

from twisted.internet import defer

from synapse.api.constants import EventTypes, Membership
from synapse.events.utils import prune_event
from synapse.types import get_domain_from_id
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -225,3 +228,141 @@ def allowed(event):

# we turn it into a list before returning it.
defer.returnValue(list(filtered_events))


@defer.inlineCallbacks
def filter_events_for_server(store, server_name, events):
# First lets check to see if all the events have a history visibility
# of "shared" or "world_readable". If thats the case then we don't
# need to check membership (as we know the server is in the room).
event_to_state_ids = yield store.get_state_ids_for_events(
frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
)
)

visibility_ids = set()
for sids in event_to_state_ids.itervalues():
hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
if hist:
visibility_ids.add(hist)

# If we failed to find any history visibility events then the default
# is "shared" visiblity.
if not visibility_ids:
defer.returnValue(events)

event_map = yield store.get_events(visibility_ids)
all_open = all(
e.content.get("history_visibility") in (None, "shared", "world_readable")
for e in event_map.itervalues()
)

if all_open:
defer.returnValue(events)

# Ok, so we're dealing with events that have non-trivial visibility
# rules, so we need to also get the memberships of the room.

# first, for each event we're wanting to return, get the event_ids
# of the history vis and membership state at those events.
event_to_state_ids = yield store.get_state_ids_for_events(
frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, None),
)
)

# We only want to pull out member events that correspond to the
# server's domain.
#
# event_to_state_ids contains lots of duplicates, so it turns out to be
# cheaper to build a complete set of unique
# ((type, state_key), event_id) tuples, and then filter out the ones we
# don't want.
#
state_key_to_event_id_set = {
e
for key_to_eid in six.itervalues(event_to_state_ids)
for e in key_to_eid.items()
}

def include(typ, state_key):
if typ != EventTypes.Member:
return True

# we avoid using get_domain_from_id here for efficiency.
idx = state_key.find(":")
if idx == -1:
return False
return state_key[idx + 1:] == server_name

event_map = yield store.get_events([
e_id
for key, e_id in state_key_to_event_id_set
if include(key[0], key[1])
])

event_to_state = {
e_id: {
key: event_map[inner_e_id]
for key, inner_e_id in key_to_eid.iteritems()
if inner_e_id in event_map
}
for e_id, key_to_eid in event_to_state_ids.iteritems()
}

erased_senders = yield store.are_users_erased(
e.sender for e in events,
)

def redact_disallowed(event, state):
# if the sender has been gdpr17ed, always return a redacted
# copy of the event.
if erased_senders[event.sender]:
logger.info(
"Sender of %s has been erased, redacting",
event.event_id,
)
return prune_event(event)

if not state:
return event

history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
if history:
visibility = history.content.get("history_visibility", "shared")
if visibility in ["invited", "joined"]:
# We now loop through all state events looking for
# membership states for the requesting server to determine
# if the server is either in the room or has been invited
# into the room.
for ev in state.itervalues():
if ev.type != EventTypes.Member:
continue
try:
domain = get_domain_from_id(ev.state_key)
except Exception:
continue

if domain != server_name:
continue

memtype = ev.membership
if memtype == Membership.JOIN:
return event
elif memtype == Membership.INVITE:
if visibility == "invited":
return event
else:
# server has no users in the room: redact
return prune_event(event)

return event

defer.returnValue([
redact_disallowed(e, event_to_state[e.event_id])
for e in events
])
Loading