Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add room_version param to get_pdu #4448

Merged
merged 3 commits into from
Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/4448.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add infrastructure to support different event formats
11 changes: 8 additions & 3 deletions synapse/federation/federation_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ def __init__(self, hs):
self._clock = hs.get_clock()

@defer.inlineCallbacks
def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
include_none=False):
def _check_sigs_and_hash_and_fetch(self, origin, pdus, room_version,
outlier=False, include_none=False):
"""Takes a list of PDUs and checks the signatures and hashs of each
one. If a PDU fails its signature check then we check if we have it in
the database and if not then request if from the originating server of
Expand All @@ -56,8 +56,12 @@ def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
a new list.

Args:
origin (str)
pdu (list)
outlier (bool)
room_version (str)
outlier (bool): Whether the events are outliers or not
include_none (str): Whether to include None in the returned list
for events that have failed their checks

Returns:
Deferred : A list of PDUs that have valid signatures and hashes.
Expand All @@ -84,6 +88,7 @@ def handle_check_result(pdu, deferred):
res = yield self.get_pdu(
destinations=[pdu.origin],
event_id=pdu.event_id,
room_version=room_version,
outlier=outlier,
timeout=10000,
)
Expand Down
47 changes: 40 additions & 7 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@

from twisted.internet import defer

from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
from synapse.api.constants import (
KNOWN_ROOM_VERSIONS,
EventTypes,
Membership,
RoomVersions,
)
from synapse.api.errors import (
CodeMessageException,
FederationDeniedError,
Expand Down Expand Up @@ -202,7 +207,8 @@ def backfill(self, dest, context, limit, extremities):

@defer.inlineCallbacks
@log_function
def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
def get_pdu(self, destinations, event_id, room_version, outlier=False,
timeout=None):
"""Requests the PDU with given origin and ID from the remote home
servers.

Expand All @@ -212,6 +218,7 @@ def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
Args:
destinations (list): Which home servers to query
event_id (str): event to fetch
room_version (str): version of the room
outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitary point in the context as opposed to part
of the current block of PDUs. Defaults to `False`
Expand Down Expand Up @@ -352,10 +359,13 @@ def get_state_for_room(self, destination, room_id, event_id):
ev.event_id for ev in itertools.chain(pdus, auth_chain)
])

room_version = yield self.store.get_room_version(room_id)

signed_pdus = yield self._check_sigs_and_hash_and_fetch(
destination,
[p for p in pdus if p.event_id not in seen_events],
outlier=True
outlier=True,
room_version=room_version,
)
signed_pdus.extend(
seen_events[p.event_id] for p in pdus if p.event_id in seen_events
Expand All @@ -364,7 +374,8 @@ def get_state_for_room(self, destination, room_id, event_id):
signed_auth = yield self._check_sigs_and_hash_and_fetch(
destination,
[p for p in auth_chain if p.event_id not in seen_events],
outlier=True
outlier=True,
room_version=room_version,
)
signed_auth.extend(
seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events
Expand Down Expand Up @@ -411,6 +422,8 @@ def random_server_list():
random.shuffle(srvs)
return srvs

room_version = yield self.store.get_room_version(room_id)

batch_size = 20
missing_events = list(missing_events)
for i in range(0, len(missing_events), batch_size):
Expand All @@ -421,6 +434,7 @@ def random_server_list():
self.get_pdu,
destinations=random_server_list(),
event_id=e_id,
room_version=room_version,
)
for e_id in batch
]
Expand Down Expand Up @@ -450,8 +464,11 @@ def get_event_auth(self, destination, room_id, event_id):
for p in res["auth_chain"]
]

room_version = yield self.store.get_room_version(room_id)

signed_auth = yield self._check_sigs_and_hash_and_fetch(
destination, auth_chain, outlier=True
destination, auth_chain,
outlier=True, room_version=room_version,
)

signed_auth.sort(key=lambda e: e.depth)
Expand Down Expand Up @@ -650,9 +667,21 @@ def send_request(destination):
for p in itertools.chain(state, auth_chain)
}

room_version = None
for e in state:
if (e.type, e.state_key) == (EventTypes.Create, ""):
room_version = e.content.get("room_version", RoomVersions.V1)
break

if room_version is None:
# If the state doesn't have a create event then the room is
# invalid, and it would fail auth checks anyway.
raise SynapseError(400, "No create event in state")

valid_pdus = yield self._check_sigs_and_hash_and_fetch(
destination, list(pdus.values()),
outlier=True,
room_version=room_version,
)

valid_pdus_map = {
Expand Down Expand Up @@ -790,8 +819,10 @@ def query_auth(self, destination, room_id, event_id, local_auth):
for e in content["auth_chain"]
]

room_version = yield self.store.get_room_version(room_id)

signed_auth = yield self._check_sigs_and_hash_and_fetch(
destination, auth_chain, outlier=True
destination, auth_chain, outlier=True, room_version=room_version,
)

signed_auth.sort(key=lambda e: e.depth)
Expand Down Expand Up @@ -838,8 +869,10 @@ def get_missing_events(self, destination, room_id, earliest_events_ids,
for e in content.get("events", [])
]

room_version = yield self.store.get_room_version(room_id)

signed_events = yield self._check_sigs_and_hash_and_fetch(
destination, events, outlier=False
destination, events, outlier=False, room_version=room_version,
)
except HttpResponseException as e:
if not e.code == 400:
Expand Down
4 changes: 3 additions & 1 deletion synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,10 @@ def on_query_auth_request(self, origin, content, room_id, event_id):
for e in content["auth_chain"]
]

room_version = yield self.store.get_room_version(room_id)

signed_auth = yield self._check_sigs_and_hash_and_fetch(
origin, auth_chain, outlier=True
origin, auth_chain, outlier=True, room_version=room_version,
)

ret = yield self.handler.on_query_auth(
Expand Down
17 changes: 15 additions & 2 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
EventTypes,
Membership,
RejectedReason,
RoomVersions,
)
from synapse.api.errors import (
AuthError,
Expand Down Expand Up @@ -342,6 +343,8 @@ def on_receive_pdu(
room_id, event_id, p,
)

room_version = yield self.store.get_room_version(room_id)

with logcontext.nested_logging_context(p):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
Expand All @@ -355,7 +358,7 @@ def on_receive_pdu(
# we want the state *after* p; get_state_for_room returns the
# state *before* p.
remote_event = yield self.federation_client.get_pdu(
[origin], p, outlier=True,
[origin], p, room_version, outlier=True,
)

if remote_event is None:
Expand All @@ -379,7 +382,6 @@ def on_receive_pdu(
for x in remote_state:
event_map[x.event_id] = x

room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_store(
room_version, state_maps, event_map,
state_res_store=StateResolutionStore(self.store),
Expand Down Expand Up @@ -655,6 +657,8 @@ def backfill(self, dest, room_id, limit, extremities):
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")

room_version = yield self.store.get_room_version(room_id)

events = yield self.federation_client.backfill(
dest,
room_id,
Expand Down Expand Up @@ -748,6 +752,7 @@ def backfill(self, dest, room_id, limit, extremities):
self.federation_client.get_pdu,
[dest],
event_id,
room_version=room_version,
outlier=True,
timeout=10000,
)
Expand Down Expand Up @@ -1659,6 +1664,13 @@ def _persist_auth_tree(self, origin, auth_events, state, event):
create_event = e
break

if create_event is None:
# If the state doesn't have a create event then the room is
# invalid, and it would fail auth checks anyway.
raise SynapseError(400, "No create event in state")

room_version = create_event.content.get("room_version", RoomVersions.V1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible for us to get here without a create_event (if the auth chain is faulty), in which case this will explode rather than failing sensibly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe so, but may as well add a check anyway.


missing_auth_events = set()
for e in itertools.chain(auth_events, state, [event]):
for e_id in e.auth_event_ids():
Expand All @@ -1669,6 +1681,7 @@ def _persist_auth_tree(self, origin, auth_events, state, event):
m_ev = yield self.federation_client.get_pdu(
[origin],
e_id,
room_version=room_version,
outlier=True,
timeout=10000,
)
Expand Down