Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3541 from matrix-org/rav/optimize_filter_events_f…
Browse files Browse the repository at this point in the history
…or_server

Refactor and optimze filter_events_for_server
  • Loading branch information
richvdh authored Jul 17, 2018
2 parents bc006b3 + 94440ae commit 9c04b4a
Show file tree
Hide file tree
Showing 4 changed files with 409 additions and 138 deletions.
1 change: 1 addition & 0 deletions changelog.d/3541.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimisation to make handling incoming federation requests more efficient.
144 changes: 6 additions & 138 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
add_hashes_and_signatures,
compute_event_signature,
)
from synapse.events.utils import prune_event
from synapse.events.validator import EventValidator
from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
Expand All @@ -52,8 +51,8 @@
from synapse.util.distributor import user_joined_room
from synapse.util.frozenutils import unfreeze
from synapse.util.logutils import log_function
from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server

from ._base import BaseHandler

Expand Down Expand Up @@ -501,137 +500,6 @@ def _process_received_pdu(self, origin, pdu, state, auth_chain):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)

@measure_func("_filter_events_for_server")
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
"""Filter the given events for the given server, redacting those the
server can't see.
Assumes the server is currently in the room.
Returns
list[FrozenEvent]
"""
# First lets check to see if all the events have a history visibility
# of "shared" or "world_readable". If thats the case then we don't
# need to check membership (as we know the server is in the room).
event_to_state_ids = yield self.store.get_state_ids_for_events(
frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
)
)

visibility_ids = set()
for sids in event_to_state_ids.itervalues():
hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
if hist:
visibility_ids.add(hist)

# If we failed to find any history visibility events then the default
# is "shared" visiblity.
if not visibility_ids:
defer.returnValue(events)

event_map = yield self.store.get_events(visibility_ids)
all_open = all(
e.content.get("history_visibility") in (None, "shared", "world_readable")
for e in event_map.itervalues()
)

if all_open:
defer.returnValue(events)

# Ok, so we're dealing with events that have non-trivial visibility
# rules, so we need to also get the memberships of the room.

event_to_state_ids = yield self.store.get_state_ids_for_events(
frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, None),
)
)

# We only want to pull out member events that correspond to the
# server's domain.

def check_match(id):
try:
return server_name == get_domain_from_id(id)
except Exception:
return False

# Parses mapping `event_id -> (type, state_key) -> state event_id`
# to get all state ids that we're interested in.
event_map = yield self.store.get_events([
e_id
for key_to_eid in list(event_to_state_ids.values())
for key, e_id in key_to_eid.items()
if key[0] != EventTypes.Member or check_match(key[1])
])

event_to_state = {
e_id: {
key: event_map[inner_e_id]
for key, inner_e_id in key_to_eid.iteritems()
if inner_e_id in event_map
}
for e_id, key_to_eid in event_to_state_ids.iteritems()
}

erased_senders = yield self.store.are_users_erased(
e.sender for e in events,
)

def redact_disallowed(event, state):
# if the sender has been gdpr17ed, always return a redacted
# copy of the event.
if erased_senders[event.sender]:
logger.info(
"Sender of %s has been erased, redacting",
event.event_id,
)
return prune_event(event)

if not state:
return event

history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
if history:
visibility = history.content.get("history_visibility", "shared")
if visibility in ["invited", "joined"]:
# We now loop through all state events looking for
# membership states for the requesting server to determine
# if the server is either in the room or has been invited
# into the room.
for ev in state.itervalues():
if ev.type != EventTypes.Member:
continue
try:
domain = get_domain_from_id(ev.state_key)
except Exception:
continue

if domain != server_name:
continue

memtype = ev.membership
if memtype == Membership.JOIN:
return event
elif memtype == Membership.INVITE:
if visibility == "invited":
return event
else:
return prune_event(event)

return event

defer.returnValue([
redact_disallowed(e, event_to_state[e.event_id])
for e in events
])

@log_function
@defer.inlineCallbacks
def backfill(self, dest, room_id, limit, extremities):
Expand Down Expand Up @@ -1558,7 +1426,7 @@ def on_backfill_request(self, origin, room_id, pdu_list, limit):
limit
)

events = yield self._filter_events_for_server(origin, room_id, events)
events = yield filter_events_for_server(self.store, origin, events)

defer.returnValue(events)

Expand Down Expand Up @@ -1605,8 +1473,8 @@ def get_persisted_pdu(self, origin, event_id):
if not in_room:
raise AuthError(403, "Host not in room.")

events = yield self._filter_events_for_server(
origin, event.room_id, [event]
events = yield filter_events_for_server(
self.store, origin, [event],
)
event = events[0]
defer.returnValue(event)
Expand Down Expand Up @@ -1896,8 +1764,8 @@ def on_get_missing_events(self, origin, room_id, earliest_events,
min_depth=min_depth,
)

missing_events = yield self._filter_events_for_server(
origin, room_id, missing_events,
missing_events = yield filter_events_for_server(
self.store, origin, missing_events,
)

defer.returnValue(missing_events)
Expand Down
141 changes: 141 additions & 0 deletions synapse/visibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
import logging
import operator

import six

from twisted.internet import defer

from synapse.api.constants import EventTypes, Membership
from synapse.events.utils import prune_event
from synapse.types import get_domain_from_id
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -225,3 +228,141 @@ def allowed(event):

# we turn it into a list before returning it.
defer.returnValue(list(filtered_events))


@defer.inlineCallbacks
def filter_events_for_server(store, server_name, events):
# First lets check to see if all the events have a history visibility
# of "shared" or "world_readable". If thats the case then we don't
# need to check membership (as we know the server is in the room).
event_to_state_ids = yield store.get_state_ids_for_events(
frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
)
)

visibility_ids = set()
for sids in event_to_state_ids.itervalues():
hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
if hist:
visibility_ids.add(hist)

# If we failed to find any history visibility events then the default
# is "shared" visiblity.
if not visibility_ids:
defer.returnValue(events)

event_map = yield store.get_events(visibility_ids)
all_open = all(
e.content.get("history_visibility") in (None, "shared", "world_readable")
for e in event_map.itervalues()
)

if all_open:
defer.returnValue(events)

# Ok, so we're dealing with events that have non-trivial visibility
# rules, so we need to also get the memberships of the room.

# first, for each event we're wanting to return, get the event_ids
# of the history vis and membership state at those events.
event_to_state_ids = yield store.get_state_ids_for_events(
frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, None),
)
)

# We only want to pull out member events that correspond to the
# server's domain.
#
# event_to_state_ids contains lots of duplicates, so it turns out to be
# cheaper to build a complete set of unique
# ((type, state_key), event_id) tuples, and then filter out the ones we
# don't want.
#
state_key_to_event_id_set = {
e
for key_to_eid in six.itervalues(event_to_state_ids)
for e in key_to_eid.items()
}

def include(typ, state_key):
if typ != EventTypes.Member:
return True

# we avoid using get_domain_from_id here for efficiency.
idx = state_key.find(":")
if idx == -1:
return False
return state_key[idx + 1:] == server_name

event_map = yield store.get_events([
e_id
for key, e_id in state_key_to_event_id_set
if include(key[0], key[1])
])

event_to_state = {
e_id: {
key: event_map[inner_e_id]
for key, inner_e_id in key_to_eid.iteritems()
if inner_e_id in event_map
}
for e_id, key_to_eid in event_to_state_ids.iteritems()
}

erased_senders = yield store.are_users_erased(
e.sender for e in events,
)

def redact_disallowed(event, state):
# if the sender has been gdpr17ed, always return a redacted
# copy of the event.
if erased_senders[event.sender]:
logger.info(
"Sender of %s has been erased, redacting",
event.event_id,
)
return prune_event(event)

if not state:
return event

history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
if history:
visibility = history.content.get("history_visibility", "shared")
if visibility in ["invited", "joined"]:
# We now loop through all state events looking for
# membership states for the requesting server to determine
# if the server is either in the room or has been invited
# into the room.
for ev in state.itervalues():
if ev.type != EventTypes.Member:
continue
try:
domain = get_domain_from_id(ev.state_key)
except Exception:
continue

if domain != server_name:
continue

memtype = ev.membership
if memtype == Membership.JOIN:
return event
elif memtype == Membership.INVITE:
if visibility == "invited":
return event
else:
# server has no users in the room: redact
return prune_event(event)

return event

defer.returnValue([
redact_disallowed(e, event_to_state[e.event_id])
for e in events
])
Loading

0 comments on commit 9c04b4a

Please sign in to comment.