Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Deduplicate redundant lazy-loaded members #3331

Merged
merged 17 commits into from
Jul 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3331.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add support for the include_redundant_members filter param as per MSC1227
9 changes: 9 additions & 0 deletions synapse/api/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@
"lazy_load_members": {
"type": "boolean"
},
"include_redundant_members": {
"type": "boolean"
},
}
}

Expand Down Expand Up @@ -267,6 +270,9 @@ def ephemeral_limit(self):
def lazy_load_members(self):
return self._room_state_filter.lazy_load_members()

def include_redundant_members(self):
return self._room_state_filter.include_redundant_members()

def filter_presence(self, events):
return self._presence_filter.filter(events)

Expand Down Expand Up @@ -426,6 +432,9 @@ def limit(self):
def lazy_load_members(self):
return self.filter_json.get("lazy_load_members", False)

def include_redundant_members(self):
return self.filter_json.get("include_redundant_members", False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per https://docs.google.com/document/d/11yn-mAkYll10RJpN0mkYEVqraTbU3U4eQx9MNrzqX1U/edit?disco=AAAACEx0noo, we should consider validating the value passed by the client - presumably in the constructor rather than here.

(this applies to lazy_load_members too, of course; I just forgot it there.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've fixed up the proposal doc to explicitly demand {true|false} there. This is however being strictly validated anyway via the JSON schema validation over at: https://github.com/matrix-org/synapse/pull/3331/files#diff-ed81002a2d319904392e1a6f871eb2edR121

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oooh I hadn't spotted that. well, yay!



def _matches_wildcard(actual_value, filter_value):
if filter_value.endswith("*"):
Expand Down
87 changes: 62 additions & 25 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,23 @@
from synapse.push.clientformat import format_push_rules_for_user
from synapse.types import RoomStreamToken
from synapse.util.async import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
from synapse.visibility import filter_events_for_client

logger = logging.getLogger(__name__)

# Store the cache that tracks which lazy-loaded members have been sent to a given
# client for no more than 30 minutes.
LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000

# Remember the last 100 members we sent to a client for the purposes of
# avoiding redundantly sending the same lazy-loaded members to the client
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100


SyncConfig = collections.namedtuple("SyncConfig", [
"user",
Expand Down Expand Up @@ -182,6 +192,12 @@ def __init__(self, hs):
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()

# ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
self.lazy_loaded_members_cache = ExpiringCache(
"lazy_loaded_members_cache", self.clock,
max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
)

def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
Expand Down Expand Up @@ -505,9 +521,13 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke
with Measure(self.clock, "compute_state_delta"):

types = None
lazy_load_members = sync_config.filter_collection.lazy_load_members()
filtered_types = None

lazy_load_members = sync_config.filter_collection.lazy_load_members()
include_redundant_members = (
sync_config.filter_collection.include_redundant_members()
)

if lazy_load_members:
# We only request state for the members needed to display the
# timeline:
Expand All @@ -523,6 +543,11 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke
# only apply the filtering to room members
filtered_types = [EventTypes.Member]

timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events if event.is_state()
}

if full_state:
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
Expand All @@ -543,11 +568,6 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke

state_ids = current_state_ids

timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events if event.is_state()
}

state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_ids,
Expand All @@ -571,21 +591,6 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke
filtered_types=filtered_types,
)

timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events if event.is_state()
}

# TODO: optionally filter out redundant membership events at this
# point, to stop repeatedly sending members in every /sync as if
# the client isn't tracking them.
# When implemented, this should filter using event_ids (not mxids).
# In practice, limited syncs are
# relatively rare so it's not a total disaster to send redundant
# members down at this point. Redundant members are ones which
# repeatedly get sent down /sync because we don't know if the client
# is caching them or not.

state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
Expand All @@ -596,16 +601,48 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke
else:
state_ids = {}
if lazy_load_members:
# TODO: filter out redundant members based on their mxids (not their
# event_ids) at this point. We know we can do it based on mxid as this
# is an non-gappy incremental sync.

if types:
state_ids = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, types=types,
filtered_types=filtered_types,
)

if lazy_load_members and not include_redundant_members:
cache_key = (sync_config.user.to_string(), sync_config.device_id)
cache = self.lazy_loaded_members_cache.get(cache_key)
if cache is None:
logger.debug("creating LruCache for %r", cache_key)
cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
self.lazy_loaded_members_cache[cache_key] = cache
else:
logger.debug("found LruCache for %r", cache_key)

# if it's a new sync sequence, then assume the client has had
# amnesia and doesn't want any recent lazy-loaded members
# de-duplicated.
if since_token is None:
logger.debug("clearing LruCache for %r", cache_key)
cache.clear()
else:
# only send members which aren't in our LruCache (either
# because they're new to this client or have been pushed out
# of the cache)
logger.debug("filtering state from %r...", state_ids)
state_ids = {
t: event_id
for t, event_id in state_ids.iteritems()
if cache.get(t[1]) != event_id
}
logger.debug("...to %r", state_ids)

# add any member IDs we are about to send into our LruCache
for t, event_id in itertools.chain(
state_ids.items(),
timeline_state.items(),
):
if t[0] == EventTypes.Member:
cache.set(t[1], event_id)

state = {}
if state_ids:
state = yield self.store.get_events(list(state_ids.values()))
Expand Down