diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index bb3d9258a6a5..90235ff098bc 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -303,18 +303,10 @@ def _attempt_new_transaction(self, destination): try: self.pending_transactions[destination] = 1 + # XXX: what's this for? yield run_on_reactor() while True: - pending_pdus = self.pending_pdus_by_dest.pop(destination, []) - pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_presence = self.pending_presence_by_dest.pop(destination, {}) - pending_failures = self.pending_failures_by_dest.pop(destination, []) - - pending_edus.extend( - self.pending_edus_keyed_by_dest.pop(destination, {}).values() - ) - limiter = yield get_retry_limiter( destination, self.clock, @@ -326,6 +318,24 @@ def _attempt_new_transaction(self, destination): yield self._get_new_device_messages(destination) ) + # BEGIN CRITICAL SECTION + # + # In order to avoid a race condition, we need to make sure that + # the following code (from popping the queues up to the point + # where we decide if we actually have any pending messages) is + # atomic - otherwise new PDUs or EDUs might arrive in the + # meantime, but not get sent because we hold the + # pending_transactions flag. + + pending_pdus = self.pending_pdus_by_dest.pop(destination, []) + pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_presence = self.pending_presence_by_dest.pop(destination, {}) + pending_failures = self.pending_failures_by_dest.pop(destination, []) + + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + pending_edus.extend(device_message_edus) if pending_presence: pending_edus.append( @@ -355,6 +365,8 @@ def _attempt_new_transaction(self, destination): ) return + # END CRITICAL SECTION + success = yield self._send_new_transaction( destination, pending_pdus, pending_edus, pending_failures, limiter=limiter,