diff --git a/changelog.d/4403.misc b/changelog.d/4403.misc new file mode 100644 index 000000000000..d07a4fd61e9d --- /dev/null +++ b/changelog.d/4403.misc @@ -0,0 +1 @@ +Add concept of different event format versions diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 46c4b4b9dc00..51ee078bc372 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -120,6 +120,19 @@ class RoomVersions(object): RoomVersions.STATE_V2_TEST, } + +class EventFormatVersions(object): + """This is an internal enum for tracking the version of the event format, + independently from the room version. + """ + V1 = 1 + + +KNOWN_EVENT_FORMAT_VERSIONS = { + EventFormatVersions.V1, +} + + ServerNoticeMsgType = "m.server_notice" ServerNoticeLimitReached = "m.server_notice.usage_limit_reached" diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 84c75495d544..3b6576ad269f 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -18,6 +18,11 @@ import six +from synapse.api.constants import ( + KNOWN_EVENT_FORMAT_VERSIONS, + KNOWN_ROOM_VERSIONS, + EventFormatVersions, +) from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze @@ -179,6 +184,8 @@ def auth_event_ids(self): class FrozenEvent(EventBase): + format_version = EventFormatVersions.V1 # All events of this type are V1 + def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None): event_dict = dict(event_dict) @@ -232,3 +239,39 @@ def __repr__(self): self.get("type", None), self.get("state_key", None), ) + + +def event_type_from_room_version(room_version): + """Returns the python type to use to construct an Event object for the + given room version. + + Args: + room_version (str): The room version + + Returns: + type: A type that can be initialized as per the initializer of + `FrozenEvent` + """ + if room_version not in KNOWN_ROOM_VERSIONS: + raise Exception( + "No event format defined for version %r" % (room_version,) + ) + return FrozenEvent + + +def event_type_from_format_version(format_version): + """Returns the python type to use to construct an Event object for the + given event format version. + + Args: + format_version (int): The event format version + + Returns: + type: A type that can be initialized as per the initializer of + `FrozenEvent` + """ + if format_version not in KNOWN_EVENT_FORMAT_VERSIONS: + raise Exception( + "No event format %r" % (format_version,) + ) + return FrozenEvent diff --git a/synapse/events/builder.py b/synapse/events/builder.py index e662eaef1061..7e633710957c 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -15,12 +15,39 @@ import copy +from synapse.api.constants import RoomVersions from synapse.types import EventID from synapse.util.stringutils import random_string from . import EventBase, FrozenEvent, _event_dict_property +def get_event_builder(room_version, key_values={}, internal_metadata_dict={}): + """Generate an event builder appropriate for the given room version + + Args: + room_version (str): Version of the room that we're creating an + event builder for + key_values (dict): Fields used as the basis of the new event + internal_metadata_dict (dict): Used to create the `_EventInternalMetadata` + object. + + Returns: + EventBuilder + """ + if room_version in { + RoomVersions.V1, + RoomVersions.V2, + RoomVersions.VDH_TEST, + RoomVersions.STATE_V2_TEST, + }: + return EventBuilder(key_values, internal_metadata_dict) + else: + raise Exception( + "No event format defined for version %r" % (room_version,) + ) + + class EventBuilder(EventBase): def __init__(self, key_values={}, internal_metadata_dict={}): signatures = copy.deepcopy(key_values.pop("signatures", {})) @@ -58,7 +85,29 @@ def create_event_id(self): return e_id.to_string() - def new(self, key_values={}): + def new(self, room_version, key_values={}): + """Generate an event builder appropriate for the given room version + + Args: + room_version (str): Version of the room that we're creating an + event builder for + key_values (dict): Fields used as the basis of the new event + + Returns: + EventBuilder + """ + + # There's currently only the one event version defined + if room_version not in { + RoomVersions.V1, + RoomVersions.V2, + RoomVersions.VDH_TEST, + RoomVersions.STATE_V2_TEST, + }: + raise Exception( + "No event format defined for version %r" % (room_version,) + ) + key_values["event_id"] = self.create_event_id() time_now = int(self.clock.time_msec()) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index b7ad729c634e..e86689ef395d 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -23,7 +23,7 @@ from synapse.api.constants import MAX_DEPTH, EventTypes, Membership from synapse.api.errors import Codes, SynapseError from synapse.crypto.event_signing import check_event_content_hash -from synapse.events import FrozenEvent +from synapse.events import event_type_from_room_version from synapse.events.utils import prune_event from synapse.http.servlet import assert_params_in_dict from synapse.types import get_domain_from_id @@ -43,7 +43,8 @@ def __init__(self, hs): self._clock = hs.get_clock() @defer.inlineCallbacks - def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False, + def _check_sigs_and_hash_and_fetch(self, origin, pdus, room_version, + outlier=False, include_none=False): """Takes a list of PDUs and checks the signatures and hashs of each one. If a PDU fails its signature check then we check if we have it in @@ -56,8 +57,12 @@ def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False, a new list. Args: - pdu (list) - outlier (bool) + origin (str): where the events came from + pdus (list) + room_version (str): The version of the room the events belong to + outlier (bool): Are the event outliers or not + include_none (bool): Whether the returned list should have `None` + for invalid PDUs or not Returns: Deferred : A list of PDUs that have valid signatures and hashes. @@ -84,6 +89,7 @@ def handle_check_result(pdu, deferred): res = yield self.get_pdu( destinations=[pdu.origin], event_id=pdu.event_id, + room_version=room_version, outlier=outlier, timeout=10000, ) @@ -297,11 +303,12 @@ def _is_invite_via_3pid(event): ) -def event_from_pdu_json(pdu_json, outlier=False): +def event_from_pdu_json(pdu_json, room_version, outlier=False): """Construct a FrozenEvent from an event json received over federation Args: - pdu_json (object): pdu as received over federation + pdu_json (object): PDU as received over federation + room_version (str): The version of the room this PDU belongs to outlier (bool): True to mark this event as an outlier Returns: @@ -325,9 +332,7 @@ def event_from_pdu_json(pdu_json, outlier=False): elif depth > MAX_DEPTH: raise SynapseError(400, "Depth too large", Codes.BAD_JSON) - event = FrozenEvent( - pdu_json - ) + event = event_type_from_room_version(room_version)(pdu_json) event.internal_metadata.outlier = outlier diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d05ed91d64b7..bd58cda76ef2 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -25,7 +25,12 @@ from twisted.internet import defer -from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership +from synapse.api.constants import ( + KNOWN_ROOM_VERSIONS, + EventTypes, + Membership, + RoomVersions, +) from synapse.api.errors import ( CodeMessageException, FederationDeniedError, @@ -187,8 +192,9 @@ def backfill(self, dest, context, limit, extremities): logger.debug("backfill transaction_data=%s", repr(transaction_data)) + room_version = yield self.store.get_room_version(context) pdus = [ - event_from_pdu_json(p, outlier=False) + event_from_pdu_json(p, room_version, outlier=False) for p in transaction_data["pdus"] ] @@ -202,7 +208,7 @@ def backfill(self, dest, context, limit, extremities): @defer.inlineCallbacks @log_function - def get_pdu(self, destinations, event_id, outlier=False, timeout=None): + def get_pdu(self, destinations, event_id, room_version, outlier=False, timeout=None): """Requests the PDU with given origin and ID from the remote home servers. @@ -212,6 +218,7 @@ def get_pdu(self, destinations, event_id, outlier=False, timeout=None): Args: destinations (list): Which home servers to query event_id (str): event to fetch + room_version (str): The room version of the room of the event outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if it's from an arbitary point in the context as opposed to part of the current block of PDUs. Defaults to `False` @@ -245,7 +252,7 @@ def get_pdu(self, destinations, event_id, outlier=False, timeout=None): logger.debug("transaction_data %r", transaction_data) pdu_list = [ - event_from_pdu_json(p, outlier=outlier) + event_from_pdu_json(p, room_version, outlier=outlier) for p in transaction_data["pdus"] ] @@ -299,6 +306,8 @@ def get_state_for_room(self, destination, room_id, event_id): A list of events in the state, and a list of events in the auth chain for the given event. """ + room_version = yield self.store.get_room_version(room_id) + try: # First we try and ask for just the IDs, as thats far quicker if # we have most of the state and auth_chain already. @@ -311,7 +320,8 @@ def get_state_for_room(self, destination, room_id, event_id): auth_event_ids = result.get("auth_chain_ids", []) fetched_events, failed_to_fetch = yield self.get_events( - [destination], room_id, set(state_event_ids + auth_event_ids) + [destination], room_id, set(state_event_ids + auth_event_ids), + room_version=room_version, ) if failed_to_fetch: @@ -340,11 +350,12 @@ def get_state_for_room(self, destination, room_id, event_id): ) pdus = [ - event_from_pdu_json(p, outlier=True) for p in result["pdus"] + event_from_pdu_json(p, room_version, outlier=True) + for p in result["pdus"] ] auth_chain = [ - event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, room_version, outlier=True) for p in result.get("auth_chain", []) ] @@ -355,6 +366,7 @@ def get_state_for_room(self, destination, room_id, event_id): signed_pdus = yield self._check_sigs_and_hash_and_fetch( destination, [p for p in pdus if p.event_id not in seen_events], + room_version=room_version, outlier=True ) signed_pdus.extend( @@ -364,6 +376,7 @@ def get_state_for_room(self, destination, room_id, event_id): signed_auth = yield self._check_sigs_and_hash_and_fetch( destination, [p for p in auth_chain if p.event_id not in seen_events], + room_version=room_version, outlier=True ) signed_auth.extend( @@ -375,7 +388,8 @@ def get_state_for_room(self, destination, room_id, event_id): defer.returnValue((signed_pdus, signed_auth)) @defer.inlineCallbacks - def get_events(self, destinations, room_id, event_ids, return_local=True): + def get_events(self, destinations, room_id, event_ids, room_version, + return_local=True): """Fetch events from some remote destinations, checking if we already have them. @@ -383,6 +397,7 @@ def get_events(self, destinations, room_id, event_ids, return_local=True): destinations (list) room_id (str) event_ids (list) + room_version (str): The version of the room the events belong to return_local (bool): Whether to include events we already have in the DB in the returned list of events @@ -421,6 +436,7 @@ def random_server_list(): self.get_pdu, destinations=random_server_list(), event_id=e_id, + room_version=room_version, ) for e_id in batch ] @@ -445,13 +461,16 @@ def get_event_auth(self, destination, room_id, event_id): destination, room_id, event_id, ) + room_version = yield self.store.get_room_version(room_id) + auth_chain = [ - event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, room_version, outlier=True) for p in res["auth_chain"] ] signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True + destination, auth_chain, + room_version=room_version, outlier=True, ) signed_auth.sort(key=lambda e: e.depth) @@ -536,8 +555,9 @@ def make_membership_event(self, destinations, room_id, user_id, membership, params (dict[str, str|Iterable[str]]): Query parameters to include in the request. Return: - Deferred: resolves to a tuple of (origin (str), event (object)) - where origin is the remote homeserver which generated the event. + Deferred[tuple[str, dict, str]]: resolves to a tuple of + `(origin, event, room_version)` where origin is the remote + homeserver which generated the event. Fails with a ``SynapseError`` if the chosen remote server returns a 300/400 code. @@ -557,6 +577,8 @@ def send_request(destination): destination, room_id, user_id, membership, params, ) + room_version = ret.get("room_version", RoomVersions.V1) + pdu_dict = ret.get("event", None) if not isinstance(pdu_dict, dict): raise InvalidResponseError("Bad 'event' field in response") @@ -571,17 +593,17 @@ def send_request(destination): if "prev_state" not in pdu_dict: pdu_dict["prev_state"] = [] - ev = builder.EventBuilder(pdu_dict) + ev = builder.get_event_builder(room_version, pdu_dict) defer.returnValue( - (destination, ev) + (destination, ev, room_version) ) return self._try_destination_list( "make_" + membership, destinations, send_request, ) - def send_join(self, destinations, pdu): + def send_join(self, destinations, pdu, room_version): """Sends a join event to one of a list of homeservers. Doing so will cause the remote server to add the event to the graph, @@ -614,7 +636,7 @@ def check_authchain_validity(signed_auth_chain): ) # the room version should be sane. - room_version = create_event.content.get("room_version", "1") + room_version = create_event.content.get("room_version", RoomVersions.V1) if room_version not in KNOWN_ROOM_VERSIONS: # This shouldn't be possible, because the remote server should have # rejected the join attempt during make_join. @@ -636,12 +658,12 @@ def send_request(destination): logger.debug("Got content: %s", content) state = [ - event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, room_version, outlier=True) for p in content.get("state", []) ] auth_chain = [ - event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, room_version, outlier=True) for p in content.get("auth_chain", []) ] @@ -652,6 +674,7 @@ def send_request(destination): valid_pdus = yield self._check_sigs_and_hash_and_fetch( destination, list(pdus.values()), + room_version=room_version, outlier=True, ) @@ -707,7 +730,9 @@ def send_invite(self, destination, room_id, event_id, pdu): logger.debug("Got response to send_invite: %s", pdu_dict) - pdu = event_from_pdu_json(pdu_dict) + room_version = yield self.store.get_room_version(room_id) + + pdu = event_from_pdu_json(pdu_dict, room_version) # Check signatures are correct. pdu = yield self._check_sigs_and_hash(pdu) @@ -785,13 +810,17 @@ def query_auth(self, destination, room_id, event_id, local_auth): content=send_content, ) + room_version = yield self.store.get_room_version(room_id) + auth_chain = [ - event_from_pdu_json(e) + event_from_pdu_json(e, room_version) for e in content["auth_chain"] ] signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True + destination, auth_chain, + room_version=room_version, + outlier=True, ) signed_auth.sort(key=lambda e: e.depth) @@ -833,13 +862,17 @@ def get_missing_events(self, destination, room_id, earliest_events_ids, timeout=timeout, ) + room_version = yield self.store.get_room_version(room_id) + events = [ - event_from_pdu_json(e) + event_from_pdu_json(e, room_version) for e in content.get("events", []) ] signed_events = yield self._check_sigs_and_hash_and_fetch( - destination, events, outlier=False + destination, events, + room_version=room_version, + outlier=False, ) except HttpResponseException as e: if not e.code == 400: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 37d29e702739..9a933b2a6163 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -25,7 +25,7 @@ from twisted.internet.abstract import isIPAddress from twisted.python import failure -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, RoomVersions from synapse.api.errors import ( AuthError, FederationError, @@ -178,14 +178,12 @@ def _handle_incoming_transaction(self, origin, transaction, request_time): continue try: - # In future we will actually use the room version to parse the - # PDU into an event. - yield self.store.get_room_version(room_id) + room_version = yield self.store.get_room_version(room_id) except NotFoundError: logger.info("Ignoring PDU for unknown room_id: %s", room_id) continue - event = event_from_pdu_json(p) + event = event_from_pdu_json(p, room_version) pdus_by_room.setdefault(room_id, []).append(event) pdu_results = {} @@ -370,7 +368,14 @@ def on_make_join_request(self, origin, room_id, user_id, supported_versions): @defer.inlineCallbacks def on_invite_request(self, origin, content, room_version): - pdu = event_from_pdu_json(content) + # Note: If the room_version is `None` then it came from the v1 invite + # API, and so the version may be either v1 or v2. However, they both + # have the same event format so we can assume v1 for the purposes of + # parsing the event. + if room_version: + pdu = event_from_pdu_json(content, room_version) + else: + pdu = event_from_pdu_json(content, RoomVersions.V1) origin_host, _ = parse_server_name(origin) yield self.check_server_matches_acl(origin_host, pdu.room_id) ret_pdu = yield self.handler.on_invite_request(origin, pdu) @@ -380,7 +385,8 @@ def on_invite_request(self, origin, content, room_version): @defer.inlineCallbacks def on_send_join_request(self, origin, content): logger.debug("on_send_join_request: content: %s", content) - pdu = event_from_pdu_json(content) + room_version = yield self.store.get_room_version(content["room_id"]) + pdu = event_from_pdu_json(content, room_version) origin_host, _ = parse_server_name(origin) yield self.check_server_matches_acl(origin_host, pdu.room_id) @@ -406,7 +412,9 @@ def on_make_leave_request(self, origin, room_id, user_id): @defer.inlineCallbacks def on_send_leave_request(self, origin, content): logger.debug("on_send_leave_request: content: %s", content) - pdu = event_from_pdu_json(content) + + room_version = yield self.store.get_room_version(content["room_id"]) + pdu = event_from_pdu_json(content, room_version) origin_host, _ = parse_server_name(origin) yield self.check_server_matches_acl(origin_host, pdu.room_id) @@ -452,13 +460,17 @@ def on_query_auth_request(self, origin, content, room_id, event_id): origin_host, _ = parse_server_name(origin) yield self.check_server_matches_acl(origin_host, room_id) + room_version = yield self.store.get_room_version(room_id) + auth_chain = [ - event_from_pdu_json(e) + event_from_pdu_json(e, room_version) for e in content["auth_chain"] ] signed_auth = yield self._check_sigs_and_hash_and_fetch( - origin, auth_chain, outlier=True + origin, auth_chain, + room_version=room_version, + outlier=True, ) ret = yield self.handler.on_query_auth( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a3bb864bb223..33cf8321d814 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -334,6 +334,8 @@ def on_receive_pdu( # we don't need this any more, let's delete it. del ours + room_version = yield self.store.get_room_version(room_id) + # Ask the remote server for the states we don't # know about for p in prevs - seen: @@ -355,7 +357,9 @@ def on_receive_pdu( # we want the state *after* p; get_state_for_room returns the # state *before* p. remote_event = yield self.federation_client.get_pdu( - [origin], p, outlier=True, + [origin], p, + room_version=room_version, + outlier=True, ) if remote_event is None: @@ -379,7 +383,6 @@ def on_receive_pdu( for x in remote_state: event_map[x.event_id] = x - room_version = yield self.store.get_room_version(room_id) state_map = yield resolve_events_with_store( room_version, state_maps, event_map, state_res_store=StateResolutionStore(self.store), @@ -1060,7 +1063,7 @@ def do_invite_join(self, target_hosts, room_id, joinee, content): """ logger.debug("Joining %s to %s", joinee, room_id) - origin, event = yield self._make_and_verify_event( + origin, event, room_version = yield self._make_and_verify_event( target_hosts, room_id, joinee, @@ -1083,7 +1086,7 @@ def do_invite_join(self, target_hosts, room_id, joinee, content): handled_events = set() try: - event = self._sign_event(event) + event = self._sign_event(room_version, event) # Try the host we successfully got a response to /make_join/ # request first. try: @@ -1091,7 +1094,9 @@ def do_invite_join(self, target_hosts, room_id, joinee, content): target_hosts.insert(0, origin) except ValueError: pass - ret = yield self.federation_client.send_join(target_hosts, event) + ret = yield self.federation_client.send_join( + target_hosts, event, room_version, + ) origin = ret["origin"] state = ret["state"] @@ -1118,7 +1123,8 @@ def do_invite_join(self, target_hosts, room_id, joinee, content): pass yield self._persist_auth_tree( - origin, auth_chain, state, event + origin, auth_chain, state, event, + room_version=room_version, ) logger.debug("Finished joining %s to %s", joinee, room_id) @@ -1164,13 +1170,18 @@ def on_make_join_request(self, room_id, user_id): """ event_content = {"membership": Membership.JOIN} - builder = self.event_builder_factory.new({ - "type": EventTypes.Member, - "content": event_content, - "room_id": room_id, - "sender": user_id, - "state_key": user_id, - }) + room_version = yield self.store.get_room_version(room_id) + + builder = self.event_builder_factory.new( + room_version, + { + "type": EventTypes.Member, + "content": event_content, + "room_id": room_id, + "sender": user_id, + "state_key": user_id, + } + ) try: event, context = yield self.event_creation_handler.create_new_client_event( @@ -1304,16 +1315,17 @@ def on_invite_request(self, origin, pdu): @defer.inlineCallbacks def do_remotely_reject_invite(self, target_hosts, room_id, user_id): - origin, event = yield self._make_and_verify_event( + origin, event, room_version = yield self._make_and_verify_event( target_hosts, room_id, user_id, "leave" ) + # Mark as outlier as we don't have any state for this event; we're not # even in the room. event.internal_metadata.outlier = True - event = self._sign_event(event) + event = self._sign_event(room_version, event) # Try the host that we succesfully called /make_leave/ on first for # the /send_leave/ request. @@ -1336,7 +1348,7 @@ def do_remotely_reject_invite(self, target_hosts, room_id, user_id): @defer.inlineCallbacks def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, content={}, params=None): - origin, pdu = yield self.federation_client.make_membership_event( + origin, pdu, room_version = yield self.federation_client.make_membership_event( target_hosts, room_id, user_id, @@ -1355,13 +1367,13 @@ def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, assert(event.user_id == user_id) assert(event.state_key == user_id) assert(event.room_id == room_id) - defer.returnValue((origin, event)) + defer.returnValue((origin, event, room_version)) - def _sign_event(self, event): + def _sign_event(self, room_version, event): event.internal_metadata.outlier = False builder = self.event_builder_factory.new( - unfreeze(event.get_pdu_json()) + room_version, unfreeze(event.get_pdu_json()) ) builder.event_id = self.event_builder_factory.create_event_id() @@ -1385,13 +1397,17 @@ def on_make_leave_request(self, room_id, user_id): leave event for the room and return that. We do *not* persist or process it until the other server has signed it and sent it back. """ - builder = self.event_builder_factory.new({ - "type": EventTypes.Member, - "content": {"membership": Membership.LEAVE}, - "room_id": room_id, - "sender": user_id, - "state_key": user_id, - }) + room_version = yield self.store.get_room_version(room_id) + builder = self.event_builder_factory.new( + room_version, + { + "type": EventTypes.Member, + "content": {"membership": Membership.LEAVE}, + "room_id": room_id, + "sender": user_id, + "state_key": user_id, + } + ) event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, @@ -1625,7 +1641,7 @@ def prep(ev_info): ) @defer.inlineCallbacks - def _persist_auth_tree(self, origin, auth_events, state, event): + def _persist_auth_tree(self, origin, auth_events, state, event, room_version): """Checks the auth chain is valid (and passes auth checks) for the state and event. Then persists the auth chain and state atomically. Persists the event separately. Notifies about the persisted events @@ -1638,6 +1654,7 @@ def _persist_auth_tree(self, origin, auth_events, state, event): auth_events (list) state (list) event (Event) + room_version (str): Version of the room the event belongs to Returns: Deferred @@ -1669,6 +1686,7 @@ def _persist_auth_tree(self, origin, auth_events, state, event): m_ev = yield self.federation_client.get_pdu( [origin], e_id, + room_version=room_version, outlier=True, timeout=10000, ) @@ -2279,14 +2297,16 @@ def exchange_third_party_invite( } if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): - builder = self.event_builder_factory.new(event_dict) + room_version = yield self.store.get_room_version(room_id) + builder = self.event_builder_factory.new(room_version, event_dict) + EventValidator().validate_new(builder) event, context = yield self.event_creation_handler.create_new_client_event( builder=builder ) event, context = yield self.add_display_name_to_third_party_invite( - event_dict, event, context + room_version, event_dict, event, context ) try: @@ -2317,14 +2337,18 @@ def on_exchange_third_party_invite_request(self, origin, room_id, event_dict): Returns: Deferred: resolves (to None) """ - builder = self.event_builder_factory.new(event_dict) + room_version = yield self.store.get_room_version(room_id) + + # NB: event_dict has a particular format we might need to fudge if we + # change event formats. + builder = self.event_builder_factory.new(room_version, event_dict) event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) event, context = yield self.add_display_name_to_third_party_invite( - event_dict, event, context + room_version, event_dict, event, context ) try: @@ -2344,7 +2368,8 @@ def on_exchange_third_party_invite_request(self, origin, room_id, event_dict): yield member_handler.send_membership_event(None, event, context) @defer.inlineCallbacks - def add_display_name_to_third_party_invite(self, event_dict, event, context): + def add_display_name_to_third_party_invite(self, room_version, event_dict, + event, context): key = ( EventTypes.ThirdPartyInvite, event.content["third_party_invite"]["signed"]["token"] @@ -2368,7 +2393,7 @@ def add_display_name_to_third_party_invite(self, event_dict, event, context): # auth checks. If we need the invite and don't have it then the # auth check code will explode appropriately. - builder = self.event_builder_factory.new(event_dict) + builder = self.event_builder_factory.new(room_version, event_dict) EventValidator().validate_new(builder) event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a7cd779b0223..7aaa4fba33ab 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -278,7 +278,15 @@ def create_event(self, requester, event_dict, token_id=None, txn_id=None, """ yield self.auth.check_auth_blocking(requester.user.to_string()) - builder = self.event_builder_factory.new(event_dict) + if event_dict["type"] == EventTypes.Create and event_dict["state_key"] == "": + room_version = event_dict["content"]["room_version"] + else: + try: + room_version = yield self.store.get_room_version(event_dict["room_id"]) + except NotFoundError: + raise AuthError(403, "Unknown room") + + builder = self.event_builder_factory.new(room_version, event_dict) self.validator.validate_new(builder) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 64a79da1627a..49c52f1a00ec 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -17,7 +17,7 @@ from twisted.internet import defer -from synapse.events import FrozenEvent +from synapse.events import event_type_from_room_version from synapse.events.snapshot import EventContext from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint @@ -70,6 +70,7 @@ def _serialize_payload(store, event_and_contexts, backfilled): event_payloads.append({ "event": event.get_pdu_json(), + "room_version": event.room_version, "internal_metadata": event.internal_metadata.get_dict(), "rejected_reason": event.rejected_reason, "context": serialized_context, @@ -94,9 +95,12 @@ def _handle_request(self, request): event_and_contexts = [] for event_payload in event_payloads: event_dict = event_payload["event"] + room_version = event_payload["room_version"] internal_metadata = event_payload["internal_metadata"] rejected_reason = event_payload["rejected_reason"] - event = FrozenEvent(event_dict, internal_metadata, rejected_reason) + event = event_type_from_room_version(room_version)( + event_dict, internal_metadata, rejected_reason, + ) context = yield EventContext.deserialize( self.store, event_payload["context"], diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 5b52c91650a8..2f25f2c58cf3 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -17,7 +17,7 @@ from twisted.internet import defer -from synapse.events import FrozenEvent +from synapse.events import event_type_from_room_version from synapse.events.snapshot import EventContext from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint @@ -74,6 +74,7 @@ def _serialize_payload(event_id, store, event, context, requester, payload = { "event": event.get_pdu_json(), + "room_version": event.room_version, "internal_metadata": event.internal_metadata.get_dict(), "rejected_reason": event.rejected_reason, "context": serialized_context, @@ -90,9 +91,12 @@ def _handle_request(self, request, event_id): content = parse_json_object_from_request(request) event_dict = content["event"] + room_version = content["room_version"] internal_metadata = content["internal_metadata"] rejected_reason = content["rejected_reason"] - event = FrozenEvent(event_dict, internal_metadata, rejected_reason) + event = event_type_from_room_version(room_version)( + event_dict, internal_metadata, rejected_reason, + ) requester = Requester.deserialize(self.store, content["requester"]) context = yield EventContext.deserialize(self.store, content["context"]) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 79e0276de622..3e1915fb87c7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1268,6 +1268,7 @@ def event_dict(event): event.internal_metadata.get_dict() ), "json": encode_json(event_dict(event)), + "format_version": event.format_version, } for event, _ in events_and_contexts ], diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index a8326f5296a6..e7d1eae53015 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -21,10 +21,10 @@ from twisted.internet import defer +from synapse.api.constants import EventFormatVersions from synapse.api.errors import NotFoundError +from synapse.events import event_type_from_format_version # these are only included to make the type annotations work -from synapse.events import EventBase # noqa: F401 -from synapse.events import FrozenEvent from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.utils import prune_event from synapse.metrics.background_process_metrics import run_as_background_process @@ -180,8 +180,6 @@ def _get_events(self, event_ids, check_redacted=True, else: event = entry.event - events.append(event) - if get_prev_content: if "replaces_state" in event.unsigned: prev = yield self.get_event( @@ -194,6 +192,8 @@ def _get_events(self, event_ids, check_redacted=True, event.unsigned["prev_content"] = prev.content event.unsigned["prev_sender"] = prev.sender + events.append(event) + defer.returnValue(events) def _invalidate_get_event_cache(self, event_id): @@ -353,6 +353,7 @@ def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): self._get_event_from_row, row["internal_metadata"], row["json"], row["redacts"], rejected_reason=row["rejects"], + format_version=row["format_version"], ) for row in rows ], @@ -377,6 +378,7 @@ def _fetch_event_rows(self, txn, events): " e.event_id as event_id, " " e.internal_metadata," " e.json," + " e.format_version, " " r.redacts as redacts," " rej.event_id as rejects " " FROM event_json as e" @@ -392,7 +394,7 @@ def _fetch_event_rows(self, txn, events): @defer.inlineCallbacks def _get_event_from_row(self, internal_metadata, js, redacted, - rejected_reason=None): + format_version, rejected_reason=None): with Measure(self._clock, "_get_event_from_row"): d = json.loads(js) internal_metadata = json.loads(internal_metadata) @@ -405,8 +407,13 @@ def _get_event_from_row(self, internal_metadata, js, redacted, desc="_get_event_from_row_rejected_reason", ) - original_ev = FrozenEvent( - d, + if format_version is None: + # This means that we stored the event before we had the concept + # of a event format version, so it must be a V1 event. + format_version = EventFormatVersions.V1 + + original_ev = event_type_from_format_version(format_version)( + event_dict=d, internal_metadata_dict=internal_metadata, rejected_reason=rejected_reason, ) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index fa36daac524d..e042221774bf 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 53 +SCHEMA_VERSION = 54 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/54/event_format_version.sql b/synapse/storage/schema/delta/54/event_format_version.sql new file mode 100644 index 000000000000..1d977c2834c0 --- /dev/null +++ b/synapse/storage/schema/delta/54/event_format_version.sql @@ -0,0 +1,16 @@ +/* Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE event_json ADD COLUMN format_version INTEGER; diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index 02bf975fbf7c..3957561b1ebf 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -18,7 +18,7 @@ from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventTypes, Membership, RoomVersions from synapse.types import RoomID, UserID from tests import unittest @@ -52,6 +52,7 @@ def inject_room_member( content = {"membership": membership} content.update(extra_content) builder = self.event_builder_factory.new( + RoomVersions.V1, { "type": EventTypes.Member, "sender": user.to_string(), @@ -74,6 +75,7 @@ def inject_message(self, room, user, body): self.depth += 1 builder = self.event_builder_factory.new( + RoomVersions.V1, { "type": EventTypes.Message, "sender": user.to_string(), @@ -94,6 +96,7 @@ def inject_message(self, room, user, body): @defer.inlineCallbacks def inject_redaction(self, room, event_id, user, reason): builder = self.event_builder_factory.new( + RoomVersions.V1, { "type": EventTypes.Redaction, "sender": user.to_string(), diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py index 978c66133d24..7fa2f4fd70fd 100644 --- a/tests/storage/test_roommember.py +++ b/tests/storage/test_roommember.py @@ -18,7 +18,7 @@ from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventTypes, Membership, RoomVersions from synapse.types import RoomID, UserID from tests import unittest @@ -50,6 +50,7 @@ def setUp(self): @defer.inlineCallbacks def inject_room_member(self, room, user, membership, replaces_state=None): builder = self.event_builder_factory.new( + RoomVersions.V1, { "type": EventTypes.Member, "sender": user.to_string(), diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py index 086a39d834ce..a1f99134dc73 100644 --- a/tests/storage/test_state.py +++ b/tests/storage/test_state.py @@ -17,7 +17,7 @@ from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventTypes, Membership, RoomVersions from synapse.storage.state import StateFilter from synapse.types import RoomID, UserID @@ -52,6 +52,7 @@ def setUp(self): @defer.inlineCallbacks def inject_state_event(self, room, sender, typ, state_key, content): builder = self.event_builder_factory.new( + RoomVersions.V1, { "type": typ, "sender": sender.to_string(), diff --git a/tests/test_visibility.py b/tests/test_visibility.py index 2eea3b098b1f..82d63ce00e4b 100644 --- a/tests/test_visibility.py +++ b/tests/test_visibility.py @@ -17,6 +17,7 @@ from twisted.internet import defer from twisted.internet.defer import succeed +from synapse.api.constants import RoomVersions from synapse.events import FrozenEvent from synapse.visibility import filter_events_for_server @@ -124,6 +125,7 @@ def test_erased_user(self): def inject_visibility(self, user_id, visibility): content = {"history_visibility": visibility} builder = self.event_builder_factory.new( + RoomVersions.V1, { "type": "m.room.history_visibility", "sender": user_id, @@ -144,6 +146,7 @@ def inject_room_member(self, user_id, membership="join", extra_content={}): content = {"membership": membership} content.update(extra_content) builder = self.event_builder_factory.new( + RoomVersions.V1, { "type": "m.room.member", "sender": user_id, @@ -165,6 +168,7 @@ def inject_message(self, user_id, content=None): if content is None: content = {"body": "testytest"} builder = self.event_builder_factory.new( + RoomVersions.V1, { "type": "m.room.message", "sender": user_id, diff --git a/tests/utils.py b/tests/utils.py index 08d6faa0a627..1aa8f9809422 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -26,7 +26,7 @@ from twisted.internet import defer, reactor -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, RoomVersions from synapse.api.errors import CodeMessageException, cs_error from synapse.config.server import ServerConfig from synapse.federation.transport import server @@ -622,6 +622,7 @@ def create_room(hs, room_id, creator_id): event_creation_handler = hs.get_event_creation_handler() builder = event_builder_factory.new( + RoomVersions.V1, { "type": EventTypes.Create, "state_key": "",