Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

_auth_and_persist_outliers: drop events we have already seen #11994

Merged
merged 2 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11994.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move common deduplication code down into `_auth_and_persist_outliers`.
44 changes: 20 additions & 24 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,6 @@ async def process_remote_join(
Raises:
SynapseError if the response is in some way invalid.
"""
event_map = {e.event_id: e for e in itertools.chain(auth_events, state)}

create_event = None
for e in auth_events:
if (e.type, e.state_key) == (EventTypes.Create, ""):
Expand All @@ -439,11 +437,6 @@ async def process_remote_join(
if room_version.identifier != room_version_id:
raise SynapseError(400, "Room version mismatch")

# filter out any events we have already seen
seen_remotes = await self._store.have_seen_events(room_id, event_map.keys())
for s in seen_remotes:
event_map.pop(s, None)

# persist the auth chain and state events.
#
# any invalid events here will be marked as rejected, and we'll carry on.
Expand All @@ -455,7 +448,9 @@ async def process_remote_join(
# signatures right now doesn't mean that we will *never* be able to, so it
# is premature to reject them.
#
await self._auth_and_persist_outliers(room_id, event_map.values())
await self._auth_and_persist_outliers(
room_id, itertools.chain(auth_events, state)
)

# and now persist the join event itself.
logger.info("Peristing join-via-remote %s", event)
Expand Down Expand Up @@ -1245,6 +1240,16 @@ async def _auth_and_persist_outliers(
"""
event_map = {event.event_id: event for event in events}

# filter out any events we have already seen. This might happen because
# the events were eagerly pushed to us (eg, during a room join), or because
# another thread has raced against us since we decided to request the event.
#
# This is just an optimisation, so it doesn't need to be watertight - the event
# persister does another round of deduplication.
seen_remotes = await self._store.have_seen_events(room_id, event_map.keys())
for s in seen_remotes:
event_map.pop(s, None)

# XXX: it might be possible to kick this process off in parallel with fetching
# the events.
while event_map:
Expand Down Expand Up @@ -1717,31 +1722,22 @@ async def _get_remote_auth_chain_for_event(
event_id: the event for which we are lacking auth events
"""
try:
remote_event_map = {
e.event_id: e
for e in await self._federation_client.get_event_auth(
destination, room_id, event_id
)
}
remote_events = await self._federation_client.get_event_auth(
destination, room_id, event_id
)

except RequestSendFailed as e1:
# The other side isn't around or doesn't implement the
# endpoint, so lets just bail out.
logger.info("Failed to get event auth from remote: %s", e1)
return

logger.info("/event_auth returned %i events", len(remote_event_map))
logger.info("/event_auth returned %i events", len(remote_events))

# `event` may be returned, but we should not yet process it.
remote_event_map.pop(event_id, None)

# nor should we reprocess any events we have already seen.
seen_remotes = await self._store.have_seen_events(
room_id, remote_event_map.keys()
)
for s in seen_remotes:
remote_event_map.pop(s, None)
remote_events = (e for e in remote_events if e.event_id != event_id)

await self._auth_and_persist_outliers(room_id, remote_event_map.values())
await self._auth_and_persist_outliers(room_id, remote_events)

async def _update_context_for_auth_events(
self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
Expand Down