From 2e57d40789c1b27391eb71afd8cfeaaf1f399e67 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Jun 2021 12:23:31 +0100 Subject: [PATCH 1/2] Add some metrics to staging area --- .../databases/main/event_federation.py | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index f2d27ee89305..08d75b0d41bd 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -16,6 +16,8 @@ from queue import Empty, PriorityQueue from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple +from prometheus_client import Gauge + from synapse.api.constants import MAX_DEPTH from synapse.api.errors import StoreError from synapse.api.room_versions import RoomVersion @@ -32,6 +34,16 @@ from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter +oldest_pdu_in_federation_staging = Gauge( + "synapse_federation_server_oldest_inbound_pdu_in_staging", + "The age in seconds since we received the oldest pdu in the federation staging area", +) + +number_pdus_in_federation_queue = Gauge( + "synapse_federation_server_number_inbound_pdu_in_staging", + "The total number of events in the inbound federation staging", +) + logger = logging.getLogger(__name__) @@ -54,6 +66,8 @@ def __init__(self, database: DatabasePool, db_conn, hs): 500000, "_event_auth_cache", size_callback=len ) # type: LruCache[str, List[Tuple[str, int]]] + self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000) + async def get_auth_chain( self, room_id: str, event_ids: Collection[str], include_given: bool = False ) -> List[EventBase]: @@ -1193,6 +1207,31 @@ def _get_next_staged_event_for_room_txn(txn): return origin, event + @wrap_as_background_process("_get_stats_for_federation_staging") + async def _get_stats_for_federation_staging(self): + """Update the prometheus metrics for the inbound federation staging area.""" + + def _get_stats_for_federation_staging_txn(txn): + txn.execute( + "SELECT coalesce(count(*), 0) FROM federation_inbound_events_staging" + ) + (count,) = txn.fetchone() + + txn.execute( + "SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging" + ) + + (age,) = txn.fetchone() + + return count, age + + count, age = await self.db_pool.runInteraction( + "_get_stats_for_federation_staging", _get_stats_for_federation_staging_txn + ) + + number_pdus_in_federation_queue.set(count) + oldest_pdu_in_federation_staging.set(age) + class EventFederationStore(EventFederationWorkerStore): """Responsible for storing and serving up the various graphs associated From e590aa407541830fbdbc2bada4c55cad03419def Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Jun 2021 14:37:34 +0100 Subject: [PATCH 2/2] Newsfile --- changelog.d/10284.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/10284.feature diff --git a/changelog.d/10284.feature b/changelog.d/10284.feature new file mode 100644 index 000000000000..379155e8cf54 --- /dev/null +++ b/changelog.d/10284.feature @@ -0,0 +1 @@ +Add metrics for new inbound federation staging area.