From e3a720217a9d200a7c3db8305df53ef8bf76f565 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 14:47:37 +0100 Subject: [PATCH 1/8] Add /state_ids federation API The new API only returns the event_ids for the state, as most requesters will already have the vast majority of the events already. --- synapse/federation/federation_client.py | 73 ++++++++++++++++++++++++- synapse/federation/federation_server.py | 21 +++++++ synapse/federation/transport/client.py | 22 ++++++++ synapse/federation/transport/server.py | 12 ++++ 4 files changed, 125 insertions(+), 3 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b06387051c86..03f6133e61b1 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -314,9 +314,32 @@ def get_state_for_room(self, destination, room_id, event_id): Deferred: Results in a list of PDUs. """ - result = yield self.transport_layer.get_room_state( - destination, room_id, event_id=event_id, - ) + try: + # First we try and ask for just the IDs, as thats far quicker if + # we have most of the state and auth_chain already. + # However, this may 404 if the other side has an old synapse. + result = yield self.transport_layer.get_room_state_ids( + destination, room_id, event_id=event_id, + ) + + state_event_ids = result["pdus"] + auth_event_ids = result.get("auth_chain", []) + + event_map, _failed_to_fetch = yield self.get_events( + [destination], room_id, set(state_event_ids + auth_event_ids) + ) + + pdus = [event_map[e_id] for e_id in state_event_ids] + auth_chain = [event_map[e_id] for e_id in auth_event_ids] + + auth_chain.sort(key=lambda e: e.depth) + + defer.returnValue((pdus, auth_chain)) + except HttpResponseException as e: + if e.code == 404: + logger.info("Failed to use get_room_state_ids API, falling back") + else: + raise e pdus = [ self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] @@ -339,6 +362,50 @@ def get_state_for_room(self, destination, room_id, event_id): defer.returnValue((signed_pdus, signed_auth)) + @defer.inlineCallbacks + def get_events(self, destinations, room_id, event_ids, return_local=True): + if return_local: + seen_events = yield self.store.get_events(event_ids) + signed_events = seen_events.values() + else: + seen_events = yield self.store.have_events(event_ids) + signed_events = [] + + failed_to_fetch = [] + + missing_events = set(event_ids) + for k in seen_events: + missing_events.discard(k) + + if not missing_events: + defer.returnValue((signed_events, failed_to_fetch)) + + def random_server_list(): + srvs = list(destinations) + random.shuffle(srvs) + return srvs + + batch_size = 20 + for i in xrange(0, len(missing_events), batch_size): + batch = missing_events[i:i + batch_size] + + deferreds = [ + self.get_pdu( + destinations=random_server_list(), + event_id=e_id, + ).addBoth(lambda r, e: (r, e), e_id) + for e_id in batch + ] + + res = yield defer.DeferredList(deferreds, consumeErrors=True) + for (result, val), (e_id, _) in res: + if result and val: + signed_events.append(val) + else: + failed_to_fetch.add(e_id) + + defer.returnValue((signed_events, failed_to_fetch)) + @defer.inlineCallbacks @log_function def get_event_auth(self, destination, room_id, event_id): diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 612d274bdb2e..40e9fda0eb60 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -214,6 +214,27 @@ def on_context_state_request(self, origin, room_id, event_id): defer.returnValue((200, resp)) + @defer.inlineCallbacks + def on_state_ids_request(self, origin, room_id, event_id): + if not event_id: + raise NotImplementedError("Specify an event") + + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") + + pdus = yield self.handler.get_state_for_pdu( + room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) + + defer.returnValue((200, { + "pdus": [pdu.event_id for pdu in pdus], + "auth_chain": [pdu.event_id for pdu in auth_chain], + })) + @defer.inlineCallbacks def _on_context_state_request_compute(self, room_id, event_id): pdus = yield self.handler.get_state_for_pdu( diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index ebb698e27880..3d088e43cbdb 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -54,6 +54,28 @@ def get_room_state(self, destination, room_id, event_id): destination, path=path, args={"event_id": event_id}, ) + @log_function + def get_room_state_ids(self, destination, room_id, event_id): + """ Requests all state for a given room from the given server at the + given event. Returns the state's event_id's + + Args: + destination (str): The host name of the remote home server we want + to get the state from. + context (str): The name of the context we want the state of + event_id (str): The event we want the context at. + + Returns: + Deferred: Results in a dict received from the remote homeserver. + """ + logger.debug("get_room_state_ids dest=%s, room=%s", + destination, room_id) + + path = PREFIX + "/state_ids/%s/" % room_id + return self.client.get_json( + destination, path=path, args={"event_id": event_id}, + ) + @log_function def get_event(self, destination, event_id, timeout=None): """ Requests the pdu with give id and origin from the given server. diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 26fa88ae840a..3ae7c48457dc 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -271,6 +271,17 @@ def on_GET(self, origin, content, query, context): ) +class FederationStateIdsServlet(BaseFederationServlet): + PATH = "/state_ids/(?P[^/]*)/" + + def on_GET(self, origin, content, query, room_id): + return self.handler.on_state_ids_request( + origin, + room_id, + query.get("event_id", [None])[0], + ) + + class FederationBackfillServlet(BaseFederationServlet): PATH = "/backfill/(?P[^/]*)/" @@ -538,6 +549,7 @@ def on_GET(self, origin, content, query): FederationPullServlet, FederationEventServlet, FederationStateServlet, + FederationStateIdsServlet, FederationBackfillServlet, FederationQueryServlet, FederationMakeJoinServlet, From a60a2eaa02f454dbc450cf821f6cd1c6b0b93993 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 14:52:43 +0100 Subject: [PATCH 2/8] Comment --- synapse/federation/federation_client.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 03f6133e61b1..0491f1c3fec3 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -364,6 +364,20 @@ def get_state_for_room(self, destination, room_id, event_id): @defer.inlineCallbacks def get_events(self, destinations, room_id, event_ids, return_local=True): + """Fetch events from some remote destinations, checking if we already + have them. + + Args: + destinations (list) + room_id (str) + event_ids (list) + return_local (bool): Whether to include events we already have in + the DB in the returned list of events + + Returns: + Deferred: A deferred resolving to a 2-tuple where the first is a list of + events and the second is a list of event ids that we failed to fetch. + """ if return_local: seen_events = yield self.store.get_events(event_ids) signed_events = seen_events.values() From 520ee9bd2c91a75eb1dc7ed923016967856c6bdf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 15:03:15 +0100 Subject: [PATCH 3/8] Fix syntax error --- synapse/federation/federation_client.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 0491f1c3fec3..6c626cf12c0f 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -325,10 +325,17 @@ def get_state_for_room(self, destination, room_id, event_id): state_event_ids = result["pdus"] auth_event_ids = result.get("auth_chain", []) - event_map, _failed_to_fetch = yield self.get_events( + fetched_events, failed_to_fetch = yield self.get_events( [destination], room_id, set(state_event_ids + auth_event_ids) ) + if failed_to_fetch: + logger.warn("Failed to get %r", failed_to_fetch) + + event_map = { + ev.event_id: ev for ev in fetched_events + } + pdus = [event_map[e_id] for e_id in state_event_ids] auth_chain = [event_map[e_id] for e_id in auth_event_ids] From 4c56bedee3bb63d7035fca4a1a092b11de0b18cc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 15:04:29 +0100 Subject: [PATCH 4/8] Actually call get_room_state --- synapse/federation/federation_client.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 6c626cf12c0f..7eadcdd28c19 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -348,6 +348,10 @@ def get_state_for_room(self, destination, room_id, event_id): else: raise e + result = yield self.transport_layer.get_room_state( + destination, room_id, event_id=event_id, + ) + pdus = [ self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] ] From bcc9cda8ca75b5cc381ce10ba9b8e4af56c6bdaa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 17:17:26 +0100 Subject: [PATCH 5/8] Fix copy + paste fails --- synapse/federation/federation_client.py | 15 ++++++++++----- synapse/federation/federation_server.py | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 7eadcdd28c19..dde10967be2a 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -411,8 +411,13 @@ def random_server_list(): return srvs batch_size = 20 - for i in xrange(0, len(missing_events), batch_size): - batch = missing_events[i:i + batch_size] + while missing_events: + batch = [] + try: + for _ in range(0, batch_size): + batch.append(missing_events.pop()) + except KeyError: + pass deferreds = [ self.get_pdu( @@ -423,9 +428,9 @@ def random_server_list(): ] res = yield defer.DeferredList(deferreds, consumeErrors=True) - for (result, val), (e_id, _) in res: - if result and val: - signed_events.append(val) + for success, (result, e_id) in res: + if success and result: + signed_events.append(result) else: failed_to_fetch.add(e_id) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 40e9fda0eb60..35a01eeccaf9 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -623,7 +623,7 @@ def _handle_new_pdu(self, origin, pdu, get_missing=True): origin, pdu.room_id, pdu.event_id, ) except: - logger.warn("Failed to get state for event: %s", pdu.event_id) + logger.exception("Failed to get state for event: %s", pdu.event_id) yield self.handler.on_receive_pdu( origin, From edb33eb163b6c60bfd2c3cab78a6bd13a47b6702 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 17:19:15 +0100 Subject: [PATCH 6/8] Rename fields to _ids --- synapse/federation/federation_client.py | 4 ++-- synapse/federation/federation_server.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index dde10967be2a..264f3c0aece3 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -322,8 +322,8 @@ def get_state_for_room(self, destination, room_id, event_id): destination, room_id, event_id=event_id, ) - state_event_ids = result["pdus"] - auth_event_ids = result.get("auth_chain", []) + state_event_ids = result["pdu_ids"] + auth_event_ids = result.get("auth_chain_ids", []) fetched_events, failed_to_fetch = yield self.get_events( [destination], room_id, set(state_event_ids + auth_event_ids) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 35a01eeccaf9..2b91f93e0954 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -231,8 +231,8 @@ def on_state_ids_request(self, origin, room_id, event_id): ) defer.returnValue((200, { - "pdus": [pdu.event_id for pdu in pdus], - "auth_chain": [pdu.event_id for pdu in auth_chain], + "pdu_ids": [pdu.event_id for pdu in pdus], + "auth_chain_ids": [pdu.event_id for pdu in auth_chain], })) @defer.inlineCallbacks From e3ee63578f335037c73675209bb7861045c2027a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Aug 2016 14:01:18 +0100 Subject: [PATCH 7/8] Tidy up get_events --- synapse/federation/federation_client.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 264f3c0aece3..ae0d650700ca 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -411,28 +411,26 @@ def random_server_list(): return srvs batch_size = 20 - while missing_events: - batch = [] - try: - for _ in range(0, batch_size): - batch.append(missing_events.pop()) - except KeyError: - pass + missing_events = len(missing_events) + for i in xrange(0, batch_size, batch_size): + batch = set(missing_events[i:i + batch_size]) deferreds = [ self.get_pdu( destinations=random_server_list(), event_id=e_id, - ).addBoth(lambda r, e: (r, e), e_id) + ) for e_id in batch ] res = yield defer.DeferredList(deferreds, consumeErrors=True) - for success, (result, e_id) in res: - if success and result: + for success, result in res: + if success: signed_events.append(result) - else: - failed_to_fetch.add(e_id) + batch.discard(result.event_id) + + # We removed all events we successfully fetched from `batch` + failed_to_fetch.update(batch) defer.returnValue((signed_events, failed_to_fetch)) From 257c41cc2e7163c42b8bcfa3a29d42ae5ac087b9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Aug 2016 14:05:45 +0100 Subject: [PATCH 8/8] Fix typos. --- synapse/federation/federation_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ae0d650700ca..c6ed720166d9 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -396,7 +396,7 @@ def get_events(self, destinations, room_id, event_ids, return_local=True): seen_events = yield self.store.have_events(event_ids) signed_events = [] - failed_to_fetch = [] + failed_to_fetch = set() missing_events = set(event_ids) for k in seen_events: @@ -411,8 +411,8 @@ def random_server_list(): return srvs batch_size = 20 - missing_events = len(missing_events) - for i in xrange(0, batch_size, batch_size): + missing_events = list(missing_events) + for i in xrange(0, len(missing_events), batch_size): batch = set(missing_events[i:i + batch_size]) deferreds = [