diff --git a/quickwit/quickwit-actors/src/mailbox.rs b/quickwit/quickwit-actors/src/mailbox.rs index 90f95e7b8b3..711136401a5 100644 --- a/quickwit/quickwit-actors/src/mailbox.rs +++ b/quickwit/quickwit-actors/src/mailbox.rs @@ -21,10 +21,10 @@ use std::any::Any; use std::convert::Infallible; use std::fmt; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Weak}; +use std::sync::{Arc, OnceLock, Weak}; use std::time::Instant; -use quickwit_common::metrics::IntCounter; +use quickwit_common::metrics::{GaugeGuard, IntCounter, IntGauge}; use tokio::sync::oneshot; use crate::channel_with_priority::{Receiver, Sender, TrySendError}; @@ -311,39 +311,44 @@ impl<A: Actor> Mailbox<A> { } } +struct InboxInner<A: Actor> { + rx: Receiver<Envelope<A>>, + _inboxes_count_gauge_guard: GaugeGuard<'static>, +} + pub struct Inbox<A: Actor> { - rx: Arc<Receiver<Envelope<A>>>, + inner: Arc<InboxInner<A>>, } impl<A: Actor> Clone for Inbox<A> { fn clone(&self) -> Self { Inbox { - rx: self.rx.clone(), + inner: self.inner.clone(), } } } impl<A: Actor> Inbox<A> { pub(crate) fn is_empty(&self) -> bool { - self.rx.is_empty() + self.inner.rx.is_empty() } pub(crate) async fn recv(&self) -> Result<Envelope<A>, RecvError> { - self.rx.recv().await + self.inner.rx.recv().await } pub(crate) async fn recv_cmd_and_scheduled_msg_only(&self) -> Envelope<A> { - self.rx.recv_high_priority().await + self.inner.rx.recv_high_priority().await } pub(crate) fn try_recv(&self) -> Result<Envelope<A>, RecvError> { - self.rx.try_recv() + self.inner.rx.try_recv() } #[cfg(any(test, feature = "testsuite"))] pub async fn recv_typed_message<M: 'static>(&self) -> Result<M, RecvError> { loop { - match self.rx.recv().await { + match self.inner.rx.recv().await { Ok(mut envelope) => { if let Some(msg) = envelope.message_typed() { return Ok(msg); @@ -362,7 +367,8 @@ impl<A: Actor> Inbox<A> { /// Warning this iterator might never be exhausted if there is a living /// mailbox associated to it. pub fn drain_for_test(&self) -> Vec<Box<dyn Any>> { - self.rx + self.inner + .rx .drain_low_priority() .into_iter() .map(|mut envelope| envelope.message()) @@ -375,7 +381,8 @@ impl<A: Actor> Inbox<A> { /// Warning this iterator might never be exhausted if there is a living /// mailbox associated to it. pub fn drain_for_test_typed<M: 'static>(&self) -> Vec<M> { - self.rx + self.inner + .rx .drain_low_priority() .into_iter() .flat_map(|mut envelope| envelope.message_typed()) @@ -383,6 +390,19 @@ impl<A: Actor> Inbox<A> { } } +fn get_actor_inboxes_count_gauge_guard() -> GaugeGuard<'static> { + static INBOX_GAUGE: std::sync::OnceLock<IntGauge> = OnceLock::new(); + let gauge = INBOX_GAUGE.get_or_init(|| { + quickwit_common::metrics::new_gauge( + "inboxes_count", + "overall count of actors", + "actor", + &[], + ) + }); + GaugeGuard::from_gauge(gauge) +} + pub(crate) fn create_mailbox<A: Actor>( actor_name: String, queue_capacity: QueueCapacity, @@ -398,7 +418,13 @@ pub(crate) fn create_mailbox<A: Actor>( }), ref_count, }; - let inbox = Inbox { rx: Arc::new(rx) }; + let inner = InboxInner { + rx, + _inboxes_count_gauge_guard: get_actor_inboxes_count_gauge_guard(), + }; + let inbox = Inbox { + inner: Arc::new(inner), + }; (mailbox, inbox) }