-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Avoid backfill when we already have messages to return #15737
Changes from 9 commits
70f636e
3250dc6
8c5d9ec
4de6313
cd61e74
da94217
fd349c4
9a0156b
7740e8b
b1cd673
6c68b72
000923a
db68fcc
812968e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Improve `/messages` response time by avoiding backfill when we already have messages to return. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,6 +40,11 @@ | |
|
||
logger = logging.getLogger(__name__) | ||
|
||
# How many single events we tolerate returning in a `/messages` response before we | ||
# backfill and try to fill in the history. This is an arbitrarily picked number and may | ||
# need to be tuned in the future. | ||
BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3 | ||
|
||
|
||
@attr.s(slots=True, auto_attribs=True) | ||
class PurgeStatus: | ||
|
@@ -486,35 +491,35 @@ async def get_messages( | |
room_id, room_token.stream | ||
) | ||
|
||
if not use_admin_priviledge and membership == Membership.LEAVE: | ||
# If they have left the room then clamp the token to be before | ||
# they left the room, to save the effort of loading from the | ||
# database. | ||
|
||
# This is only None if the room is world_readable, in which | ||
# case "JOIN" would have been returned. | ||
assert member_event_id | ||
# If they have left the room then clamp the token to be before | ||
# they left the room, to save the effort of loading from the | ||
# database. | ||
if ( | ||
pagin_config.direction == Direction.BACKWARDS | ||
and not use_admin_priviledge | ||
and membership == Membership.LEAVE | ||
): | ||
# This is only None if the room is world_readable, in which case | ||
# "Membership.JOIN" would have been returned and we should never hit | ||
# this branch. | ||
assert member_event_id | ||
|
||
leave_token = await self.store.get_topological_token_for_event( | ||
member_event_id | ||
) | ||
assert leave_token.topological is not None | ||
|
||
leave_token = await self.store.get_topological_token_for_event( | ||
member_event_id | ||
if leave_token.topological < curr_topo: | ||
from_token = from_token.copy_and_replace( | ||
StreamKeyType.ROOM, leave_token | ||
) | ||
assert leave_token.topological is not None | ||
|
||
if leave_token.topological < curr_topo: | ||
from_token = from_token.copy_and_replace( | ||
StreamKeyType.ROOM, leave_token | ||
) | ||
|
||
await self.hs.get_federation_handler().maybe_backfill( | ||
room_id, | ||
curr_topo, | ||
limit=pagin_config.limit, | ||
) | ||
|
||
to_room_key = None | ||
if pagin_config.to_token: | ||
to_room_key = pagin_config.to_token.room_key | ||
|
||
# Initially fetch the events from the database. With any luck, we can return | ||
# these without blocking on backfill (handled below). | ||
events, next_key = await self.store.paginate_room_events( | ||
room_id=room_id, | ||
from_key=from_token.room_key, | ||
|
@@ -524,6 +529,93 @@ async def get_messages( | |
event_filter=event_filter, | ||
) | ||
|
||
if pagin_config.direction == Direction.BACKWARDS: | ||
# We use a `Set` because there can be multiple events at a given depth | ||
# and we only care about looking at the unique continum of depths to | ||
# find gaps. | ||
event_depths: Set[int] = {event.depth for event in events} | ||
sorted_event_depths = sorted(event_depths) | ||
|
||
found_big_gap = False | ||
number_of_gaps = 0 | ||
previous_event_depth = ( | ||
sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0 | ||
) | ||
for event_depth in sorted_event_depths: | ||
# We don't expect a negative depth but we'll just deal with it in | ||
# any case by taking the absolute value to get the true gap between | ||
# any two integers. | ||
depth_gap = abs(event_depth - previous_event_depth) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the list is sorted won't this always be positive even if there are negative depths? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes ex. 🤷 Perhaps this gets across that we care about the absolute distance, not the difference in any case. |
||
# A `depth_gap` of 1 is a normal continuous chain to the next event | ||
# (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's | ||
# also possible there is no event at a given depth but we can't ever | ||
# know that for sure) | ||
if depth_gap > 1: | ||
number_of_gaps += 1 | ||
|
||
# We only tolerate a small number single-event long gaps in the | ||
# returned events because those are most likely just events we've | ||
# failed to pull in the past. Anything longer than that is probably | ||
# a sign that we're missing a decent chunk of history and we should | ||
# try to backfill it. | ||
# | ||
# XXX: It's possible we could tolerate longer gaps if we checked | ||
# that a given events `prev_events` is one that has failed pull | ||
# attempts and we could just treat it like a dead branch of history | ||
# for now or at least something that we don't need the block the | ||
# client on to try pulling. | ||
# | ||
# XXX: If we had something like MSC3871 to indicate gaps in the | ||
# timeline to the client, we could also get away with any sized gap | ||
# and just have the client refetch the holes as they see fit. | ||
if depth_gap > 2: | ||
found_big_gap = True | ||
break | ||
previous_event_depth = event_depth | ||
|
||
# Backfill in the foreground if we found a big gap, have too many holes, | ||
# or we don't have enough events to fill the limit that the client asked | ||
# for. | ||
missing_too_many_events = ( | ||
number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD | ||
) | ||
not_enough_events_to_fill_response = len(events) < pagin_config.limit | ||
if ( | ||
found_big_gap | ||
or missing_too_many_events | ||
or not_enough_events_to_fill_response | ||
): | ||
did_backfill = ( | ||
await self.hs.get_federation_handler().maybe_backfill( | ||
room_id, | ||
curr_topo, | ||
limit=pagin_config.limit, | ||
) | ||
) | ||
|
||
# If we did backfill something, refetch the events from the database to | ||
# catch anything new that might have been added since we last fetched. | ||
Comment on lines
+597
to
+598
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the worst case now when we do need to backfill, it does mean that we call Plus a slow backfill that is 10+ seconds for the federation request and processing doesn't hurt from an extra 250ms |
||
if did_backfill: | ||
events, next_key = await self.store.paginate_room_events( | ||
room_id=room_id, | ||
from_key=from_token.room_key, | ||
to_key=to_room_key, | ||
direction=pagin_config.direction, | ||
limit=pagin_config.limit, | ||
event_filter=event_filter, | ||
) | ||
else: | ||
# Otherwise, we can backfill in the background for eventual | ||
# consistency's sake but we don't need to block the client waiting | ||
# for a costly federation call and processing. | ||
run_as_background_process( | ||
"maybe_backfill_in_the_background", | ||
self.hs.get_federation_handler().maybe_backfill, | ||
room_id, | ||
curr_topo, | ||
limit=pagin_config.limit, | ||
) | ||
|
||
next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key) | ||
|
||
# if no events are returned from pagination, that implies | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is just pulled out from the existing condition to de-nest it further up here and have it ready when we initially fetch.