Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
fed server: refactor on_incoming_transaction
Browse files Browse the repository at this point in the history
Move as much as possible to after the have_responded check, and reduce the
number of times we iterate over the pdu list.
  • Loading branch information
richvdh committed Oct 9, 2017
1 parent c7b0678 commit ba5b9b8
Showing 1 changed file with 29 additions and 24 deletions.
53 changes: 29 additions & 24 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,22 +109,11 @@ def on_backfill_request(self, origin, room_id, versions, limit):
@defer.inlineCallbacks
@log_function
def on_incoming_transaction(self, transaction_data):
transaction = Transaction(**transaction_data)

received_pdus_counter.inc_by(len(transaction.pdus))

for p in transaction.pdus:
if "unsigned" in p:
unsigned = p["unsigned"]
if "age" in unsigned:
p["age"] = unsigned["age"]
if "age" in p:
p["age_ts"] = int(self._clock.time_msec()) - int(p["age"])
del p["age"]
# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
request_time = int(self._clock.time_msec())

pdu_list = [
self.event_from_pdu_json(p) for p in transaction.pdus
]
transaction = Transaction(**transaction_data)

logger.debug("[%s] Got transaction", transaction.transaction_id)

Expand All @@ -140,17 +129,35 @@ def on_incoming_transaction(self, transaction_data):

logger.debug("[%s] Transaction is new", transaction.transaction_id)

results = []
received_pdus_counter.inc_by(len(transaction.pdus))

pdu_list = []

for p in transaction.pdus:
if "unsigned" in p:
unsigned = p["unsigned"]
if "age" in unsigned:
p["age"] = unsigned["age"]
if "age" in p:
p["age_ts"] = request_time - int(p["age"])
del p["age"]

event = self.event_from_pdu_json(p)
pdu_list.append(event)

pdu_results = {}

for pdu in pdu_list:
event_id = pdu.event_id
try:
yield self._handle_received_pdu(transaction.origin, pdu)
results.append({})
pdu_results[event_id] = {}
except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e)
self.send_failure(e, transaction.origin)
results.append({"error": str(e)})
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
results.append({"error": str(e)})
pdu_results[event_id] = {"error": str(e)}
logger.exception("Failed to handle PDU")

if hasattr(transaction, "edus"):
Expand All @@ -164,14 +171,12 @@ def on_incoming_transaction(self, transaction_data):
for failure in getattr(transaction, "pdu_failures", []):
logger.info("Got failure %r", failure)

logger.debug("Returning: %s", str(results))

response = {
"pdus": dict(zip(
(p.event_id for p in pdu_list), results
)),
"pdus": pdu_results,
}

logger.debug("Returning: %s", str(response))

yield self.transaction_actions.set_response(
transaction,
200, response
Expand Down

0 comments on commit ba5b9b8

Please sign in to comment.