Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Deduplicate redundant lazy-loaded members #3331

Merged
merged 17 commits into from
Jul 26, 2018
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3331.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add support for the include_redundant_members filter param as per MSC1227
9 changes: 9 additions & 0 deletions synapse/api/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@
"lazy_load_members": {
"type": "boolean"
},
"include_redundant_members": {
"type": "boolean"
},
}
}

Expand Down Expand Up @@ -267,6 +270,9 @@ def ephemeral_limit(self):
def lazy_load_members(self):
return self._room_state_filter.lazy_load_members()

def include_redundant_members(self):
return self._room_state_filter.include_redundant_members()

def filter_presence(self, events):
return self._presence_filter.filter(events)

Expand Down Expand Up @@ -426,6 +432,9 @@ def limit(self):
def lazy_load_members(self):
return self.filter_json.get("lazy_load_members", False)

def include_redundant_members(self):
return self.filter_json.get("include_redundant_members", False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per https://docs.google.com/document/d/11yn-mAkYll10RJpN0mkYEVqraTbU3U4eQx9MNrzqX1U/edit?disco=AAAACEx0noo, we should consider validating the value passed by the client - presumably in the constructor rather than here.

(this applies to lazy_load_members too, of course; I just forgot it there.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've fixed up the proposal doc to explicitly demand {true|false} there. This is however being strictly validated anyway via the JSON schema validation over at: https://github.com/matrix-org/synapse/pull/3331/files#diff-ed81002a2d319904392e1a6f871eb2edR121

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oooh I hadn't spotted that. well, yay!



def _matches_wildcard(actual_value, filter_value):
if filter_value.endswith("*"):
Expand Down
88 changes: 63 additions & 25 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,23 @@
from synapse.push.clientformat import format_push_rules_for_user
from synapse.types import RoomStreamToken
from synapse.util.async import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
from synapse.visibility import filter_events_for_client

logger = logging.getLogger(__name__)

# Store the cache that tracks which lazy-loaded members have been sent to a given
# client for no more than 30 minutes.
LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000

# Remember the last 100 members we sent to a client for the purposes of
# avoiding redundantly sending the same lazy-loaded members to the client
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100


SyncConfig = collections.namedtuple("SyncConfig", [
"user",
Expand Down Expand Up @@ -182,6 +192,12 @@ def __init__(self, hs):
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()

# ExpiringCache((User, Device)) -> LruCache(membership event_id)
self.lazy_loaded_members_cache = ExpiringCache(
"lazy_loaded_members_cache", self.clock,
max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trailing comma please

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)

def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
Expand Down Expand Up @@ -505,9 +521,13 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke
with Measure(self.clock, "compute_state_delta"):

types = None
lazy_load_members = sync_config.filter_collection.lazy_load_members()
filtered_types = None

lazy_load_members = sync_config.filter_collection.lazy_load_members()
include_redundant_members = (
sync_config.filter_collection.include_redundant_members()
)

if lazy_load_members:
# We only request state for the members needed to display the
# timeline:
Expand All @@ -520,9 +540,24 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke
)
]

if not include_redundant_members:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we doing this here, rather than where the cache is used below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hysterical reasons. fixed.

cache_key = (sync_config.user.to_string(), sync_config.device_id)
cache = self.lazy_loaded_members_cache.get(cache_key)
if cache is None:
logger.debug("creating LruCache for %r", cache_key)
cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
self.lazy_loaded_members_cache[cache_key] = cache
else:
logger.debug("found LruCache for %r", cache_key)

# only apply the filtering to room members
filtered_types = [EventTypes.Member]

timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events if event.is_state()
}

if full_state:
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
Expand All @@ -543,11 +578,6 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke

state_ids = current_state_ids

timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events if event.is_state()
}

state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_ids,
Expand All @@ -571,21 +601,6 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke
filtered_types=filtered_types,
)

timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events if event.is_state()
}

# TODO: optionally filter out redundant membership events at this
# point, to stop repeatedly sending members in every /sync as if
# the client isn't tracking them.
# When implemented, this should filter using event_ids (not mxids).
# In practice, limited syncs are
# relatively rare so it's not a total disaster to send redundant
# members down at this point. Redundant members are ones which
# repeatedly get sent down /sync because we don't know if the client
# is caching them or not.

state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
Expand All @@ -596,16 +611,39 @@ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_toke
else:
state_ids = {}
if lazy_load_members:
# TODO: filter out redundant members based on their mxids (not their
# event_ids) at this point. We know we can do it based on mxid as this
# is an non-gappy incremental sync.

if types:
state_ids = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, types=types,
filtered_types=filtered_types,
)

if lazy_load_members and not include_redundant_members:
# if it's a new sync sequence, then assume the client has had
# amnesia and doesn't want any recent lazy-loaded members
# de-duplicated.
if since_token is None:
logger.debug("clearing LruCache for %r", cache_key)
cache.clear()
else:
# only send members which aren't in our LruCache (either
# because they're new to this client or have been pushed out
# of the cache)
logger.debug("filtering state from %r...", state_ids)
state_ids = {
t: state_id
for t, state_id in state_ids.iteritems()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't ask you change the existing state_ids var, but can you s/state_id/event_id/ if it's an event id? To me a state_id sounds more like a state group id than an event id.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if not cache.get(state_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got a feeling this isn't going to be adequate. It's possible for state to revert to an earlier event thanks to state resolution: so for example Bob's member event might be A, then B, then back to A. In this case we won't tell clients it's gone back to A, because A is already in the cache.

(Admittedly there are probably other bugs in the sync code in this area, but let's not add more.)

I suspect you need to maintain the latest (type, state_key) => event_id mapping in the cache, rather than just a list of event ids.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. fixed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(although I maintain the cache as state_key => event_id for simplicity and efficiency, as type is redundant)

}
logger.debug("...to %r", state_ids)

# add any member IDs we are about to send into our LruCache
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems problematic that we only populate the cache if lazy_load_members and not include_redundant_members. what if the client calls sync with include_redundant_members=False, and then later calls it with it True?

I can see an efficiency argument, but if we're going to say that's a thing that clients can't do, let's spell it out in the proposal, along with the steps they would need to take to change their mind (presumably a re-initial-sync?)

Relatedly, is there a danger of it breaking for people who switch between client versions that have support and those that don't? I can't think of a failure offhand, but it might be worth thinking a bit harder about it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bwindels already hit this actually whilst implementing it on riot-web. we'll need to mandate that clients do a re-initialsync if they change their lazy-loading config (whether that's wrt redundancy or laziness). i'll add it to the prop.

for t, event_id in itertools.chain(
state_ids.items(),
timeline_state.items(),
):
if t[0] == EventTypes.Member:
cache.set(event_id, True)

state = {}
if state_ids:
state = yield self.store.get_events(list(state_ids.values()))
Expand Down