Skip to content

Commit

Permalink
Adding an actor inbox gauge. (#5481)
Browse files Browse the repository at this point in the history
The goal here is to offer a way to spot a possible leak
(diverging number of actors).
  • Loading branch information
fulmicoton authored Oct 9, 2024
1 parent cc7a97f commit 914a196
Showing 1 changed file with 38 additions and 12 deletions.
50 changes: 38 additions & 12 deletions quickwit/quickwit-actors/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use std::any::Any;
use std::convert::Infallible;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::sync::{Arc, OnceLock, Weak};
use std::time::Instant;

use quickwit_common::metrics::IntCounter;
use quickwit_common::metrics::{GaugeGuard, IntCounter, IntGauge};
use tokio::sync::oneshot;

use crate::channel_with_priority::{Receiver, Sender, TrySendError};
Expand Down Expand Up @@ -311,39 +311,44 @@ impl<A: Actor> Mailbox<A> {
}
}

struct InboxInner<A: Actor> {
rx: Receiver<Envelope<A>>,
_inboxes_count_gauge_guard: GaugeGuard<'static>,
}

pub struct Inbox<A: Actor> {
rx: Arc<Receiver<Envelope<A>>>,
inner: Arc<InboxInner<A>>,
}

impl<A: Actor> Clone for Inbox<A> {
fn clone(&self) -> Self {
Inbox {
rx: self.rx.clone(),
inner: self.inner.clone(),
}
}
}

impl<A: Actor> Inbox<A> {
pub(crate) fn is_empty(&self) -> bool {
self.rx.is_empty()
self.inner.rx.is_empty()
}

pub(crate) async fn recv(&self) -> Result<Envelope<A>, RecvError> {
self.rx.recv().await
self.inner.rx.recv().await
}

pub(crate) async fn recv_cmd_and_scheduled_msg_only(&self) -> Envelope<A> {
self.rx.recv_high_priority().await
self.inner.rx.recv_high_priority().await
}

pub(crate) fn try_recv(&self) -> Result<Envelope<A>, RecvError> {
self.rx.try_recv()
self.inner.rx.try_recv()
}

#[cfg(any(test, feature = "testsuite"))]
pub async fn recv_typed_message<M: 'static>(&self) -> Result<M, RecvError> {
loop {
match self.rx.recv().await {
match self.inner.rx.recv().await {
Ok(mut envelope) => {
if let Some(msg) = envelope.message_typed() {
return Ok(msg);
Expand All @@ -362,7 +367,8 @@ impl<A: Actor> Inbox<A> {
/// Warning this iterator might never be exhausted if there is a living
/// mailbox associated to it.
pub fn drain_for_test(&self) -> Vec<Box<dyn Any>> {
self.rx
self.inner
.rx
.drain_low_priority()
.into_iter()
.map(|mut envelope| envelope.message())
Expand All @@ -375,14 +381,28 @@ impl<A: Actor> Inbox<A> {
/// Warning this iterator might never be exhausted if there is a living
/// mailbox associated to it.
pub fn drain_for_test_typed<M: 'static>(&self) -> Vec<M> {
self.rx
self.inner
.rx
.drain_low_priority()
.into_iter()
.flat_map(|mut envelope| envelope.message_typed())
.collect()
}
}

fn get_actor_inboxes_count_gauge_guard() -> GaugeGuard<'static> {
static INBOX_GAUGE: std::sync::OnceLock<IntGauge> = OnceLock::new();
let gauge = INBOX_GAUGE.get_or_init(|| {
quickwit_common::metrics::new_gauge(
"inboxes_count",
"overall count of actors",
"actor",
&[],
)
});
GaugeGuard::from_gauge(gauge)
}

pub(crate) fn create_mailbox<A: Actor>(
actor_name: String,
queue_capacity: QueueCapacity,
Expand All @@ -398,7 +418,13 @@ pub(crate) fn create_mailbox<A: Actor>(
}),
ref_count,
};
let inbox = Inbox { rx: Arc::new(rx) };
let inner = InboxInner {
rx,
_inboxes_count_gauge_guard: get_actor_inboxes_count_gauge_guard(),
};
let inbox = Inbox {
inner: Arc::new(inner),
};
(mailbox, inbox)
}

Expand Down

0 comments on commit 914a196

Please sign in to comment.