Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Factor _get_missing_events_for_pdu out of _handle_new_pdu
Browse files Browse the repository at this point in the history
This should be functionally identical: it just seeks to improve readability by
reducing indentation.
  • Loading branch information
richvdh committed Mar 9, 2017
1 parent 45d173a commit 3406333
Showing 1 changed file with 82 additions and 62 deletions.
144 changes: 82 additions & 62 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,68 +574,9 @@ def _handle_new_pdu(self, origin, pdu, get_missing=True):
pdu.room_id, len(prevs - seen),
)

# We recalculate seen, since it may have changed.
have_seen = yield self.store.have_events(prevs)
seen = set(have_seen.keys())

if prevs - seen:
latest = yield self.store.get_latest_event_ids_in_room(
pdu.room_id
)

# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
latest = set(latest)
latest |= seen

logger.info(
"Missing %d events for room %r: %r...",
len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
)

# XXX: we set timeout to 10s to help workaround
# https://github.com/matrix-org/synapse/issues/1733.
# The reason is to avoid holding the linearizer lock
# whilst processing inbound /send transactions, causing
# FDs to stack up and block other inbound transactions
# which empirically can currently take up to 30 minutes.
#
# N.B. this explicitly disables retry attempts.
#
# N.B. this also increases our chances of falling back to
# fetching fresh state for the room if the missing event
# can't be found, which slightly reduces our security.
# it may also increase our DAG extremity count for the room,
# causing additional state resolution? See #1760.
# However, fetching state doesn't hold the linearizer lock
# apparently.
#
# see https://github.com/matrix-org/synapse/pull/1744

missing_events = yield self.get_missing_events(
origin,
pdu.room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=10,
min_depth=min_depth,
timeout=10000,
)

# We want to sort these by depth so we process them and
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)

for e in missing_events:
yield self._handle_new_pdu(
origin,
e,
get_missing=False
)

have_seen = yield self.store.have_events(
[ev for ev, _ in pdu.prev_events]
)
yield self._get_missing_events_for_pdu(
origin, pdu, prevs, min_depth
)

prevs = {e_id for e_id, _ in pdu.prev_events}
seen = set(have_seen.keys())
Expand Down Expand Up @@ -667,6 +608,85 @@ def _handle_new_pdu(self, origin, pdu, get_missing=True):
auth_chain=auth_chain,
)

@defer.inlineCallbacks
def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
"""
Args:
origin (str): Origin of the pdu. Will be called to get the missing events
pdu: received pdu
prevs (str[]): List of event ids which we are missing
min_depth (int): Minimum depth of events to return.
Returns:
Deferred<dict(str, str?)>: updated have_seen dictionary
"""
# We recalculate seen, since it may have changed.
have_seen = yield self.store.have_events(prevs)
seen = set(have_seen.keys())

if not prevs - seen:
# nothing left to do
defer.returnValue(have_seen)

latest = yield self.store.get_latest_event_ids_in_room(
pdu.room_id
)

# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
latest = set(latest)
latest |= seen

logger.info(
"Missing %d events for room %r: %r...",
len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
)

# XXX: we set timeout to 10s to help workaround
# https://github.com/matrix-org/synapse/issues/1733.
# The reason is to avoid holding the linearizer lock
# whilst processing inbound /send transactions, causing
# FDs to stack up and block other inbound transactions
# which empirically can currently take up to 30 minutes.
#
# N.B. this explicitly disables retry attempts.
#
# N.B. this also increases our chances of falling back to
# fetching fresh state for the room if the missing event
# can't be found, which slightly reduces our security.
# it may also increase our DAG extremity count for the room,
# causing additional state resolution? See #1760.
# However, fetching state doesn't hold the linearizer lock
# apparently.
#
# see https://github.com/matrix-org/synapse/pull/1744

missing_events = yield self.get_missing_events(
origin,
pdu.room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=10,
min_depth=min_depth,
timeout=10000,
)

# We want to sort these by depth so we process them and
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)

for e in missing_events:
yield self._handle_new_pdu(
origin,
e,
get_missing=False
)

have_seen = yield self.store.have_events(
[ev for ev, _ in pdu.prev_events]
)
defer.returnValue(have_seen)

def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name

Expand Down

0 comments on commit 3406333

Please sign in to comment.