Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Queue up federation PDUs while a room join is in progress #2016

Merged
merged 2 commits into from
Mar 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 44 additions & 24 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

"""Contains handlers for federation events."""
import synapse.util.logcontext
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64
Expand Down Expand Up @@ -114,6 +115,14 @@ def on_receive_pdu(self, origin, pdu, get_missing=True):
logger.debug("Already seen pdu %s", pdu.event_id)
return

# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if pdu.room_id in self.room_queues:
logger.info("Ignoring PDU %s for room %s from %s for now; join "
"in progress", pdu.event_id, pdu.room_id, origin)
self.room_queues[pdu.room_id].append((pdu, origin))
return

state = None

auth_chain = []
Expand Down Expand Up @@ -274,26 +283,13 @@ def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):

@log_function
@defer.inlineCallbacks
def _process_received_pdu(self, origin, pdu, state=None, auth_chain=None):
def _process_received_pdu(self, origin, pdu, state, auth_chain):
""" Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.

auth_chain and state are None if we already have the necessary state
and prev_events in the db
"""
event = pdu

logger.debug("Got event: %s", event.event_id)

# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if event.room_id in self.room_queues:
self.room_queues[event.room_id].append((pdu, origin))
return

logger.debug("Processing event: %s", event.event_id)

logger.debug("Event: %s", event)
logger.debug("Processing event: %s", event)

# FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work
Expand Down Expand Up @@ -862,8 +858,6 @@ def do_invite_join(self, target_hosts, room_id, joinee, content):
"""
logger.debug("Joining %s to %s", joinee, room_id)

yield self.store.clean_room_for_join(room_id)

origin, event = yield self._make_and_verify_event(
target_hosts,
room_id,
Expand All @@ -872,7 +866,15 @@ def do_invite_join(self, target_hosts, room_id, joinee, content):
content,
)

# This shouldn't happen, because the RoomMemberHandler has a
# linearizer lock which only allows one operation per user per room
# at a time - so this is just paranoia.
assert (room_id not in self.room_queues)

self.room_queues[room_id] = []

yield self.store.clean_room_for_join(room_id)

handled_events = set()

try:
Expand Down Expand Up @@ -925,17 +927,35 @@ def do_invite_join(self, target_hosts, room_id, joinee, content):
room_queue = self.room_queues[room_id]
del self.room_queues[room_id]

for p, origin in room_queue:
if p.event_id in handled_events:
continue
# we don't need to wait for the queued events to be processed -
# it's just a best-effort thing at this point. We do want to do
# them roughly in order, though, otherwise we'll end up making
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it.

try:
self._process_received_pdu(origin, p)
except:
logger.exception("Couldn't handle pdu")
synapse.util.logcontext.reset_context_after_deferred(
self._handle_queued_pdus(room_queue))

defer.returnValue(True)

@defer.inlineCallbacks
def _handle_queued_pdus(self, room_queue):
"""Process PDUs which got queued up while we were busy send_joining.

Args:
room_queue (list[FrozenEvent, str]): list of PDUs to be processed
and the servers that sent them
"""
for p, origin in room_queue:
try:
logger.info("Processing queued PDU %s which was received "
"while we were joining %s", p.event_id, p.room_id)
yield self.on_receive_pdu(origin, p)
except Exception as e:
logger.warn(
"Error handling queued PDU %s from %s: %s",
p.event_id, origin, e)

@defer.inlineCallbacks
@log_function
def on_make_join_request(self, room_id, user_id):
Expand Down
25 changes: 25 additions & 0 deletions synapse/util/logcontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,31 @@ def preserve_context_over_deferred(deferred, context=None):
return d


def reset_context_after_deferred(deferred):
"""If the deferred is incomplete, add a callback which will reset the
context.

This is useful when you want to fire off a deferred, but don't want to
wait for it to complete. (The deferred will restore the current log context
when it completes, so if you don't do anything, it will leak log context.)

(If this feels asymmetric, consider it this way: we are effectively forking
a new thread of execution. We are probably currently within a
``with LoggingContext()`` block, which is supposed to have a single entry
and exit point. But by spawning off another deferred, we are effectively
adding a new exit point.)

Args:
deferred (defer.Deferred): deferred
"""
def reset_context(result):
LoggingContext.set_current_context(LoggingContext.sentinel)
return result

if not deferred.called:
deferred.addBoth(reset_context)


def preserve_fn(f):
"""Ensures that function is called with correct context and that context is
restored after return. Useful for wrapping functions that return a deferred
Expand Down