From b2a1fc2a713267abc88293eb7ff1be27fd9177d6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 27 May 2021 11:05:27 -0400 Subject: [PATCH 1/3] Limit the number of events sent over replication when persisting events. --- synapse/handlers/federation.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 678f6b770797..6e80d32cf324 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -91,6 +91,7 @@ get_domain_from_id, ) from synapse.util.async_helpers import Linearizer, concurrently_execute +from synapse.util.iterutils import batch_iter from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_server @@ -3053,13 +3054,15 @@ async def persist_events_and_notify( """ instance = self.config.worker.events_shard_config.get_instance(room_id) if instance != self._instance_name: - result = await self._send_events( - instance_name=instance, - store=self.store, - room_id=room_id, - event_and_contexts=event_and_contexts, - backfilled=backfilled, - ) + # Limit the number of events sent over federation. + for batch in batch_iter(event_and_contexts, 50): + result = await self._send_events( + instance_name=instance, + store=self.store, + room_id=room_id, + event_and_contexts=batch, + backfilled=backfilled, + ) return result["max_stream_id"] else: assert self.storage.persistence From f9076dd592fc539024120377251866fb067d5747 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 27 May 2021 16:24:15 +0100 Subject: [PATCH 2/3] Changelog --- changelog.d/10082.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/10082.bugfix diff --git a/changelog.d/10082.bugfix b/changelog.d/10082.bugfix new file mode 100644 index 000000000000..b4f8bcc4fa3c --- /dev/null +++ b/changelog.d/10082.bugfix @@ -0,0 +1 @@ +Fixed a bug causing replication requests to fail when receiving a lot of events via federation. From ee3593c8d49cf59f855f211457c5f2c4dc392424 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 27 May 2021 18:06:15 +0200 Subject: [PATCH 3/3] Update synapse/handlers/federation.py Co-authored-by: Erik Johnston --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6e80d32cf324..bf113152517f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -3055,7 +3055,7 @@ async def persist_events_and_notify( instance = self.config.worker.events_shard_config.get_instance(room_id) if instance != self._instance_name: # Limit the number of events sent over federation. - for batch in batch_iter(event_and_contexts, 50): + for batch in batch_iter(event_and_contexts, 1000): result = await self._send_events( instance_name=instance, store=self.store,