Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Persist auth/state events at backwards extremities when we fetch them (
Browse files Browse the repository at this point in the history
…#6526)

* commit 'ff773ff72':
  Persist auth/state events at backwards extremities when we fetch them (#6526)
  • Loading branch information
anoadragon453 committed Mar 19, 2020
2 parents 4da8965 + ff773ff commit da06324
Showing 1 changed file with 14 additions and 39 deletions.
53 changes: 14 additions & 39 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.types import UserID, get_domain_from_id
from synapse.util import batch_iter, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
Expand Down Expand Up @@ -238,7 +237,6 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
return None

state = None
auth_chain = []

# Get missing pdus if necessary.
if not pdu.internal_metadata.is_outlier():
Expand Down Expand Up @@ -342,7 +340,6 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:

# Calculate the state after each of the previous events, and
# resolve them to find the correct state at the current event.
auth_chains = set()
event_map = {event_id: pdu}
try:
# Get the state of the events we know about
Expand All @@ -366,24 +363,14 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
p,
)

room_version = await self.store.get_room_version(room_id)

with nested_logging_context(p):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
# by the get_pdu_cache in federation_client.
(
remote_state,
got_auth_chain,
) = await self._get_state_for_room(
(remote_state, _,) = await self._get_state_for_room(
origin, room_id, p, include_event_in_state=True
)

# XXX hrm I'm not convinced that duplicate events will compare
# for equality, so I'm not sure this does what the author
# hoped.
auth_chains.update(got_auth_chain)

remote_state_map = {
(x.type, x.state_key): x.event_id for x in remote_state
}
Expand All @@ -392,6 +379,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
for x in remote_state:
event_map[x.event_id] = x

room_version = await self.store.get_room_version(room_id)
state_map = await resolve_events_with_store(
room_id,
room_version,
Expand All @@ -413,7 +401,6 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
event_map.update(evs)

state = [event_map[e] for e in six.itervalues(state_map)]
auth_chain = list(auth_chains)
except Exception:
logger.warning(
"[%s %s] Error attempting to resolve state at missing "
Expand All @@ -429,9 +416,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
affected=event_id,
)

await self._process_received_pdu(
origin, pdu, state=state, auth_chain=auth_chain
)
await self._process_received_pdu(origin, pdu, state=state)

async def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
"""
Expand Down Expand Up @@ -633,6 +618,8 @@ def _get_events_from_store_or_dest(self, destination, room_id, event_ids):
room_id (str)
event_ids (Iterable[str])
Persists any events we don't already have as outliers.
If we fail to fetch any of the events, a warning will be logged, and the event
will be omitted from the result. Likewise, any events which turn out not to
be in the given room.
Expand All @@ -652,27 +639,15 @@ def _get_events_from_store_or_dest(self, destination, room_id, event_ids):
room_id,
)

room_version = yield self.store.get_room_version(room_id)

# XXX 20 requests at once? really?
for batch in batch_iter(missing_events, 20):
deferreds = [
run_in_background(
self.federation_client.get_pdu,
destinations=[destination],
event_id=e_id,
room_version=room_version,
)
for e_id in batch
]

res = yield make_deferred_yieldable(
defer.DeferredList(deferreds, consumeErrors=True)
)
yield self._get_events_and_persist(
destination=destination, room_id=room_id, events=missing_events
)

for success, result in res:
if success and result:
fetched_events[result.event_id] = result
# we need to make sure we re-load from the database to get the rejected
# state correct.
fetched_events.update(
(yield self.store.get_events(missing_events, allow_rejected=True))
)

# check for events which were in the wrong room.
#
Expand Down Expand Up @@ -702,7 +677,7 @@ def _get_events_from_store_or_dest(self, destination, room_id, event_ids):
return fetched_events

@defer.inlineCallbacks
def _process_received_pdu(self, origin, event, state, auth_chain):
def _process_received_pdu(self, origin, event, state):
""" Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
Expand Down

0 comments on commit da06324

Please sign in to comment.