Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add endpoints for backfilling history (MSC2716) (#9247)
Browse files Browse the repository at this point in the history
  • Loading branch information
MadLittleMods authored Jun 22, 2021
1 parent 756fd51 commit 96f6293
Show file tree
Hide file tree
Showing 14 changed files with 584 additions and 23 deletions.
1 change: 1 addition & 0 deletions changelog.d/9247.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for backfilling history into rooms ([MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716)).
2 changes: 1 addition & 1 deletion scripts-dev/complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
fi

# Run the tests!
go test -v -tags synapse_blacklist,msc2946,msc3083 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests
7 changes: 2 additions & 5 deletions synapse/api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,8 @@ def __init__(self, hs: "HomeServer"):
async def check_from_context(
self, room_version: str, event, context, do_sig_check=True
) -> None:
prev_state_ids = await context.get_prev_state_ids()
auth_events_ids = self.compute_auth_events(
event, prev_state_ids, for_verification=True
)
auth_events_by_id = await self.store.get_events(auth_events_ids)
auth_event_ids = event.auth_event_ids()
auth_events_by_id = await self.store.get_events(auth_event_ids)
auth_events = {(e.type, e.state_key): e for e in auth_events_by_id.values()}

room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
Expand Down
15 changes: 15 additions & 0 deletions synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ class EventTypes:
SpaceChild = "m.space.child"
SpaceParent = "m.space.parent"

MSC2716_INSERTION = "org.matrix.msc2716.insertion"
MSC2716_MARKER = "org.matrix.msc2716.marker"


class ToDeviceEventTypes:
RoomKeyRequest = "m.room_key_request"
Expand Down Expand Up @@ -185,6 +188,18 @@ class EventContentFields:
# cf https://github.com/matrix-org/matrix-doc/pull/1772
ROOM_TYPE = "type"

# Used on normal messages to indicate they were historically imported after the fact
MSC2716_HISTORICAL = "org.matrix.msc2716.historical"
# For "insertion" events
MSC2716_NEXT_CHUNK_ID = "org.matrix.msc2716.next_chunk_id"
# Used on normal message events to indicate where the chunk connects to
MSC2716_CHUNK_ID = "org.matrix.msc2716.chunk_id"
# For "marker" events
MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion"
MSC2716_MARKER_INSERTION_PREV_EVENTS = (
"org.matrix.msc2716.marker.insertion_prev_events"
)


class RoomEncryptionAlgorithms:
MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2"
Expand Down
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ def read_config(self, config: JsonDict, **kwargs):

# MSC3026 (busy presence state)
self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool

# MSC2716 (backfill existing history)
self.msc2716_enabled = experimental.get("msc2716_enabled", False) # type: bool
9 changes: 9 additions & 0 deletions synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def __init__(self, internal_metadata_dict: JsonDict):
redacted = DictProperty("redacted") # type: bool
txn_id = DictProperty("txn_id") # type: str
token_id = DictProperty("token_id") # type: str
historical = DictProperty("historical") # type: bool

# XXX: These are set by StreamWorkerStore._set_before_and_after.
# I'm pretty sure that these are never persisted to the database, so shouldn't
Expand Down Expand Up @@ -204,6 +205,14 @@ def is_redacted(self):
"""
return self._dict.get("redacted", False)

def is_historical(self) -> bool:
"""Whether this is a historical message.
This is used by the batchsend historical message endpoint and
is needed to and mark the event as backfilled and skip some checks
like push notifications.
"""
return self._dict.get("historical", False)


class EventBase(metaclass=abc.ABCMeta):
@property
Expand Down
17 changes: 15 additions & 2 deletions synapse/events/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Any, Dict, List, Optional, Tuple, Union

import attr
Expand All @@ -33,6 +34,8 @@
from synapse.util import Clock
from synapse.util.stringutils import random_string

logger = logging.getLogger(__name__)


@attr.s(slots=True, cmp=False, frozen=True)
class EventBuilder:
Expand Down Expand Up @@ -100,6 +103,7 @@ async def build(
self,
prev_event_ids: List[str],
auth_event_ids: Optional[List[str]],
depth: Optional[int] = None,
) -> EventBase:
"""Transform into a fully signed and hashed event
Expand All @@ -108,6 +112,9 @@ async def build(
auth_event_ids: The event IDs to use as the auth events.
Should normally be set to None, which will cause them to be calculated
based on the room state at the prev_events.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
Returns:
The signed and hashed event.
Expand All @@ -131,8 +138,14 @@ async def build(
auth_events = auth_event_ids
prev_events = prev_event_ids

old_depth = await self._store.get_max_depth_of(prev_event_ids)
depth = old_depth + 1
# Otherwise, progress the depth as normal
if depth is None:
(
_,
most_recent_prev_event_depth,
) = await self._store.get_max_depth_of(prev_event_ids)

depth = most_recent_prev_event_depth + 1

# we cap depth of generated events, to ensure that they are not
# rejected by other servers (and so that they can be persisted in
Expand Down
104 changes: 99 additions & 5 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,9 @@ async def create_event(
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""
Given a dict from a client, create a new event.
Expand All @@ -508,6 +511,14 @@ async def create_event(
require_consent: Whether to check if the requester has
consented to the privacy policy.
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
Raises:
ResourceLimitError if server is blocked to some resource being
exceeded
Expand Down Expand Up @@ -563,11 +574,36 @@ async def create_event(
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id

builder.internal_metadata.outlier = outlier

builder.internal_metadata.historical = historical

# Strip down the auth_event_ids to only what we need to auth the event.
# For example, we don't need extra m.room.member that don't match event.sender
if auth_event_ids is not None:
temp_event = await builder.build(
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
)
auth_events = await self.store.get_events_as_list(auth_event_ids)
# Create a StateMap[str]
auth_event_state_map = {
(e.type, e.state_key): e.event_id for e in auth_events
}
# Actually strip down and use the necessary auth events
auth_event_ids = self.auth.compute_auth_events(
event=temp_event,
current_state_ids=auth_event_state_map,
for_verification=False,
)

event, context = await self.create_new_client_event(
builder=builder,
requester=requester,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
)

# In an ideal world we wouldn't need the second part of this condition. However,
Expand Down Expand Up @@ -724,9 +760,13 @@ async def create_and_send_nonmember_event(
self,
requester: Requester,
event_dict: dict,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
ratelimit: bool = True,
txn_id: Optional[str] = None,
ignore_shadow_ban: bool = False,
outlier: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, int]:
"""
Creates an event, then sends it.
Expand All @@ -736,10 +776,24 @@ async def create_and_send_nonmember_event(
Args:
requester: The requester sending the event.
event_dict: An entire event.
prev_event_ids:
The event IDs to use as the prev events.
Should normally be left as None to automatically request them
from the database.
auth_event_ids:
The event ids to use as the auth_events for the new event.
Should normally be left as None, which will cause them to be calculated
based on the room state at the prev_events.
ratelimit: Whether to rate limit this send.
txn_id: The transaction ID.
ignore_shadow_ban: True if shadow-banned users should be allowed to
send this event.
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
Returns:
The event, and its stream ordering (if deduplication happened,
Expand Down Expand Up @@ -779,7 +833,13 @@ async def create_and_send_nonmember_event(
return event, event.internal_metadata.stream_ordering

event, context = await self.create_event(
requester, event_dict, txn_id=txn_id
requester,
event_dict,
txn_id=txn_id,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
outlier=outlier,
depth=depth,
)

assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
Expand Down Expand Up @@ -811,6 +871,7 @@ async def create_new_client_event(
requester: Optional[Requester] = None,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
depth: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
Expand All @@ -828,6 +889,10 @@ async def create_new_client_event(
Should normally be left as None, which will cause them to be calculated
based on the room state at the prev_events.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
Returns:
Tuple of created event, context
"""
Expand All @@ -851,9 +916,24 @@ async def create_new_client_event(
), "Attempting to create an event with no prev_events"

event = await builder.build(
prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
)
context = await self.state.compute_event_context(event)

old_state = None

# Pass on the outlier property from the builder to the event
# after it is created
if builder.internal_metadata.outlier:
event.internal_metadata.outlier = builder.internal_metadata.outlier

# Calculate the state for outliers that pass in their own `auth_event_ids`
if auth_event_ids:
old_state = await self.store.get_events_as_list(auth_event_ids)

context = await self.state.compute_event_context(event, old_state=old_state)

if requester:
context.app_service = requester.app_service

Expand Down Expand Up @@ -1018,7 +1098,13 @@ async def _persist_event(
the arguments.
"""

await self.action_generator.handle_push_actions_for_event(event, context)
# Skip push notification actions for historical messages
# because we don't want to notify people about old history back in time.
# The historical messages also do not have the proper `context.current_state_ids`
# and `state_groups` because they have `prev_events` that aren't persisted yet
# (historical messages persisted in reverse-chronological order).
if not event.internal_metadata.is_historical():
await self.action_generator.handle_push_actions_for_event(event, context)

try:
# If we're a worker we need to hit out to the master.
Expand Down Expand Up @@ -1317,13 +1403,21 @@ async def persist_and_notify_client_event(
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")

# Mark any `m.historical` messages as backfilled so they don't appear
# in `/sync` and have the proper decrementing `stream_ordering` as we import
backfilled = False
if event.internal_metadata.is_historical():
backfilled = True

# Note that this returns the event that was persisted, which may not be
# the same as we passed in if it was deduplicated due transaction IDs.
(
event,
event_pos,
max_stream_token,
) = await self.storage.persistence.persist_event(event, context=context)
) = await self.storage.persistence.persist_event(
event, context=context, backfilled=backfilled
)

if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry.
Expand Down
Loading

0 comments on commit 96f6293

Please sign in to comment.