Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Optimise backfill calculation #12522

Merged
merged 16 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 134 additions & 86 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
# Copyright 2014-2022 The Matrix.org Foundation C.I.C.
# Copyright 2020 Sorunome
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -15,10 +15,14 @@

"""Contains handlers for federation events."""

import enum
import itertools
import logging
from enum import Enum
from http import HTTPStatus
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union

import attr
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64
Expand Down Expand Up @@ -92,6 +96,24 @@ def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
return sorted(joined_domains.items(), key=lambda d: d[1])


class _BackfillPointType(Enum):
# a regular backwards extremity (ie, an event which we don't yet have, but which
# is referred to by other events in the DAG)
BACKWARDS_EXTREMITY = enum.auto()
Copy link
Contributor

@MadLittleMods MadLittleMods Apr 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "backward extremity" (not plural) is the correct spelling using the database as the source of truth (and the docs docs/development/room-dag-concepts.md#backward-extremity but I derived those from the code).


# an MSC2716 "insertion event"
INSERTION_PONT = enum.auto()


@attr.s(slots=True, auto_attribs=True, frozen=True)
class _BackfillPoint:
"""A potential point we might backfill from"""

event_id: str
depth: int
type: _BackfillPointType


class FederationHandler:
"""Handles general incoming federation requests

Expand Down Expand Up @@ -157,89 +179,51 @@ async def maybe_backfill(
async def _maybe_backfill_inner(
self, room_id: str, current_depth: int, limit: int
) -> bool:
oldest_events_with_depth = (
await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
)
backwards_extremities = [
_BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room(
room_id
)
]

insertion_events_to_be_backfilled: Dict[str, int] = {}
insertion_events_to_be_backfilled: List[_BackfillPoint] = []
if self.hs.config.experimental.msc2716_enabled:
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backward_extremities_in_room(
insertion_events_to_be_backfilled = [
_BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT)
for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room(
room_id
)
)
]
logger.debug(
"_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
oldest_events_with_depth,
"_maybe_backfill_inner: backwards_extremities=%s insertion_events_to_be_backfilled=%s",
backwards_extremities,
insertion_events_to_be_backfilled,
)

if not oldest_events_with_depth and not insertion_events_to_be_backfilled:
if not backwards_extremities and not insertion_events_to_be_backfilled:
logger.debug("Not backfilling as no extremeties found.")
return False

# We only want to paginate if we can actually see the events we'll get,
# as otherwise we'll just spend a lot of resources to get redacted
# events.
#
# We do this by filtering all the backwards extremities and seeing if
# any remain. Given we don't have the extremity events themselves, we
# need to actually check the events that reference them.
#
# *Note*: the spec wants us to keep backfilling until we reach the start
# of the room in case we are allowed to see some of the history. However
# in practice that causes more issues than its worth, as a) its
# relatively rare for there to be any visible history and b) even when
# there is its often sufficiently long ago that clients would stop
# attempting to paginate before backfill reached the visible history.
#
# TODO: If we do do a backfill then we should filter the backwards
# extremities to only include those that point to visible portions of
# history.
#
# TODO: Correctly handle the case where we are allowed to see the
# forward event but not the backward extremity, e.g. in the case of
# initial join of the server where we are allowed to see the join
# event but not anything before it. This would require looking at the
# state *before* the event, ignoring the special casing certain event
# types have.

forward_event_ids = await self.store.get_successor_events(
list(oldest_events_with_depth)
)

extremities_events = await self.store.get_events(
forward_event_ids,
redact_behaviour=EventRedactBehaviour.AS_IS,
get_prev_content=False,
# we now have a list of potential places to backpaginate from. We prefer to
# start with the most recent (ie, max depth), so let's sort the list.
sorted_backfill_points: List[_BackfillPoint] = sorted(
itertools.chain(
backwards_extremities,
insertion_events_to_be_backfilled,
),
key=lambda e: -int(e.depth),
)

# We set `check_history_visibility_only` as we might otherwise get false
# positives from users having been erased.
filtered_extremities = await filter_events_for_server(
self.storage,
self.server_name,
list(extremities_events.values()),
redact=False,
check_history_visibility_only=True,
)
logger.debug(
"_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
"_maybe_backfill_inner: room_id: %s: current_depth: %s, limit: %s, "
"backfill points (%d): %s",
room_id,
current_depth,
limit,
len(sorted_backfill_points),
sorted_backfill_points,
)

if not filtered_extremities and not insertion_events_to_be_backfilled:
return False

extremities = {
**oldest_events_with_depth,
# TODO: insertion_events_to_be_backfilled is currently skipping the filtered_extremities checks
**insertion_events_to_be_backfilled,
}

# Check if we reached a point where we should start backfilling.
sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
max_depth = sorted_extremeties_tuple[0][1]

# If we're approaching an extremity we trigger a backfill, otherwise we
# no-op.
#
Expand All @@ -249,6 +233,11 @@ async def _maybe_backfill_inner(
# chose more than one times the limit in case of failure, but choosing a
# much larger factor will result in triggering a backfill request much
# earlier than necessary.
#
# XXX: shouldn't we do this *after* the filter by depth below? Again, we don't
# care about events that have happened after our current position.
#
max_depth = sorted_backfill_points[0].depth
if current_depth - 2 * limit > max_depth:
logger.debug(
"Not backfilling as we don't need to. %d < %d - 2 * %d",
Expand All @@ -265,31 +254,90 @@ async def _maybe_backfill_inner(
# 2. we have likely previously tried and failed to backfill from that
# extremity, so to avoid getting "stuck" requesting the same
# backfill repeatedly we drop those extremities.
filtered_sorted_extremeties_tuple = [
t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth
]

logger.debug(
"room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems (%d): %s filtered_sorted_extremeties_tuple: %s",
room_id,
current_depth,
limit,
max_depth,
len(sorted_extremeties_tuple),
sorted_extremeties_tuple,
filtered_sorted_extremeties_tuple,
)

#
# However, we need to check that the filtered extremities are non-empty.
# If they are empty then either we can a) bail or b) still attempt to
# backfill. We opt to try backfilling anyway just in case we do get
# relevant events.
if filtered_sorted_extremeties_tuple:
sorted_extremeties_tuple = filtered_sorted_extremeties_tuple
#
filtered_sorted_backfill_points = [
t for t in sorted_backfill_points if t.depth <= current_depth
]
if filtered_sorted_backfill_points:
logger.debug(
"_maybe_backfill_inner: backfill points before current depth: %s",
filtered_sorted_backfill_points,
)
sorted_backfill_points = filtered_sorted_backfill_points
else:
logger.debug(
"_maybe_backfill_inner: all backfill points are *after* current depth. Backfilling anyway."
)

# We only want to paginate if we can actually see the events we'll get,
# as otherwise we'll just spend a lot of resources to get redacted
# events.
#
# We do this by filtering all the backwards extremities and seeing if
# any remain.
richvdh marked this conversation as resolved.
Show resolved Hide resolved
#
# Doing this filtering can be expensive (we load the full state for the room
# at each of the sucessor events), so we check them one at a time until we find
# one that is visible, and then stop.
#
# *Note*: the spec wants us to keep backfilling until we reach the start
# of the room in case we are allowed to see some of the history. However
# in practice that causes more issues than its worth, as a) its
# relatively rare for there to be any visible history and b) even when
# there is its often sufficiently long ago that clients would stop
# attempting to paginate before backfill reached the visible history.
#
# TODO: If we do do a backfill then we should filter the backwards
# extremities to only include those that point to visible portions of
# history.

found_filtered_extremity = False
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
for bp in backwards_extremities:
# Given we don't have the extremity events themselves, we
# need to actually check the events that reference them - their "successor"
# events.
#
# TODO: Correctly handle the case where we are allowed to see the
# successor event but not the backward extremity, e.g. in the case of
# initial join of the server where we are allowed to see the join
# event but not anything before it. This would require looking at the
# state *before* the event, ignoring the special casing certain event
# types have.

forward_event_ids = await self.store.get_successor_events([bp.event_id])

extremities_events = await self.store.get_events_as_list(
successor_event_ids,
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
redact_behaviour=EventRedactBehaviour.AS_IS,
get_prev_content=False,
)

# We set `check_history_visibility_only` as we might otherwise get false
# positives from users having been erased.
filtered_extremities = await filter_events_for_server(
self.storage,
self.server_name,
extremities_events,
redact=False,
check_history_visibility_only=True,
)
if filtered_extremities:
found_filtered_extremity = True
break

if not found_filtered_extremity and not insertion_events_to_be_backfilled:
return False

# TODO: insertion_events_to_be_backfilled is currently skipping the filtered_extremities checks

# We don't want to specify too many extremities as it causes the backfill
# request URI to be too long.
extremities = dict(sorted_extremeties_tuple[:5])
extremities = [e.event_id for e in sorted_backfill_points[:5]]
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

# Now we need to decide which hosts to hit first.

Expand Down
15 changes: 8 additions & 7 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,9 @@ def _get_auth_chain_difference_txn(
# Return all events where not all sets can reach them.
return {eid for eid, n in event_to_missing_sets.items() if n}

async def get_oldest_event_ids_with_depth_in_room(self, room_id) -> Dict[str, int]:
async def get_oldest_event_ids_with_depth_in_room(
self, room_id
) -> List[Tuple[str, int]]:
"""Gets the oldest events(backwards extremities) in the room along with the
aproximate depth.

Expand All @@ -708,7 +710,7 @@ async def get_oldest_event_ids_with_depth_in_room(self, room_id) -> Dict[str, in
room_id: Room where we want to find the oldest events

Returns:
Map from event_id to depth
List of (event_id, depth) tuples
"""

def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id):
Expand Down Expand Up @@ -741,7 +743,7 @@ def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id):

txn.execute(sql, (room_id, False))

return dict(txn)
return txn.fetchall()

return await self.db_pool.runInteraction(
"get_oldest_event_ids_with_depth_in_room",
Expand All @@ -751,7 +753,7 @@ def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id):

async def get_insertion_event_backward_extremities_in_room(
self, room_id
) -> Dict[str, int]:
) -> List[Tuple[str, int]]:
"""Get the insertion events we know about that we haven't backfilled yet.

We use this function so that we can compare and see if someones current
Expand All @@ -763,7 +765,7 @@ async def get_insertion_event_backward_extremities_in_room(
room_id: Room where we want to find the oldest events

Returns:
Map from event_id to depth
List of (event_id, depth) tuples
"""

def get_insertion_event_backward_extremities_in_room_txn(txn, room_id):
Expand All @@ -778,8 +780,7 @@ def get_insertion_event_backward_extremities_in_room_txn(txn, room_id):
"""

txn.execute(sql, (room_id,))

return dict(txn)
return txn.fetchall()

return await self.db_pool.runInteraction(
"get_insertion_event_backward_extremities_in_room",
Expand Down