From 23d9bd1d745a037202bb9a134cdb848eb65a01e9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Mar 2015 11:39:40 +0000 Subject: [PATCH 1/2] Process transactions serially. Since the events received in a transaction are ordered, later events might depend on earlier events and so we shouldn't blindly process them in parellel. --- synapse/federation/federation_server.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 22b966383130..7ee37fb34d09 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -112,17 +112,23 @@ def on_incoming_transaction(self, transaction_data): logger.debug("[%s] Transaction is new", transaction.transaction_id) with PreserveLoggingContext(): - dl = [] + results = [] + for pdu in pdu_list: d = self._handle_new_pdu(transaction.origin, pdu) def handle_failure(failure): failure.trap(FederationError) self.send_failure(failure.value, transaction.origin) + return failure d.addErrback(handle_failure) - dl.append(d) + try: + yield d + results.append({}) + except Exception as e: + results.append({"error": str(e)}) if hasattr(transaction, "edus"): for edu in [Edu(**x) for x in transaction.edus]: @@ -135,21 +141,11 @@ def handle_failure(failure): for failure in getattr(transaction, "pdu_failures", []): logger.info("Got failure %r", failure) - results = yield defer.DeferredList(dl, consumeErrors=True) - - ret = [] - for r in results: - if r[0]: - ret.append({}) - else: - logger.exception(r[1]) - ret.append({"error": str(r[1].value)}) - - logger.debug("Returning: %s", str(ret)) + logger.debug("Returning: %s", str(results)) response = { "pdus": dict(zip( - (p.event_id for p in pdu_list), ret + (p.event_id for p in pdu_list), results )), } From 29481690c5b296a1c8aee3068d32ef083ef227f3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Mar 2015 11:50:43 +0000 Subject: [PATCH 2/2] If we're yielding don't add errback --- synapse/federation/federation_server.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 7ee37fb34d09..bc9bac809aa0 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -117,16 +117,12 @@ def on_incoming_transaction(self, transaction_data): for pdu in pdu_list: d = self._handle_new_pdu(transaction.origin, pdu) - def handle_failure(failure): - failure.trap(FederationError) - self.send_failure(failure.value, transaction.origin) - return failure - - d.addErrback(handle_failure) - try: yield d results.append({}) + except FederationError as e: + self.send_failure(e, transaction.origin) + results.append({"error": str(e)}) except Exception as e: results.append({"error": str(e)})