Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Add counter for bounded channel sends that block. (#5490)
Browse files Browse the repository at this point in the history
* Add counter for bounded channel sends that block.

Signed-off-by: Andrei Sandu <[email protected]>

* fix typos

Signed-off-by: Andrei Sandu <[email protected]>

* Fix bounded sent metric

Signed-off-by: Andrei Sandu <[email protected]>

* refactor a bit and test

Signed-off-by: Andrei Sandu <[email protected]>

* Return disconnect errors early.

Signed-off-by: Andrei Sandu <[email protected]>

* future proof error handling

Signed-off-by: Andrei Sandu <[email protected]>
  • Loading branch information
sandreim authored May 17, 2022
1 parent 21323b7 commit b79dccc
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 9 deletions.
28 changes: 21 additions & 7 deletions node/metered-channel/src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,23 @@ impl<T> MeteredSender<T> {
where
Self: Unpin,
{
let msg = self.prepare_with_tof(msg);
let fut = self.inner.send(msg);
futures::pin_mut!(fut);
fut.await.map_err(|e| {
self.meter.retract_sent();
e
})
match self.try_send(msg) {
Err(send_err) => {
if !send_err.is_full() {
return Err(send_err.into_send_error())
}

let msg = send_err.into_inner();
self.meter.note_sent();
let fut = self.inner.send(msg);
futures::pin_mut!(fut);
fut.await.map_err(|e| {
self.meter.retract_sent();
e
})
},
_ => Ok(()),
}
}

/// Attempt to send message or fail immediately.
Expand All @@ -174,6 +184,10 @@ impl<T> MeteredSender<T> {
) -> result::Result<(), mpsc::TrySendError<MaybeTimeOfFlight<T>>> {
let msg = self.prepare_with_tof(msg);
self.inner.try_send(msg).map_err(|e| {
if e.is_full() {
// Count bounded channel sends that block.
self.meter.note_blocked();
}
self.meter.retract_sent();
e
})
Expand Down
10 changes: 10 additions & 0 deletions node/metered-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct Meter {
sent: Arc<AtomicUsize>,
// Number of receives on this channel.
received: Arc<AtomicUsize>,
// Number of times senders blocked while sending messages to a subsystem.
blocked: Arc<AtomicUsize>,
// Atomic ringbuffer of the last 50 time of flight values
tof: Arc<crossbeam_queue::ArrayQueue<CoarseDuration>>,
}
Expand All @@ -51,6 +53,7 @@ impl std::default::Default for Meter {
Self {
sent: Arc::new(AtomicUsize::new(0)),
received: Arc::new(AtomicUsize::new(0)),
blocked: Arc::new(AtomicUsize::new(0)),
tof: Arc::new(crossbeam_queue::ArrayQueue::new(100)),
}
}
Expand All @@ -65,6 +68,8 @@ pub struct Readout {
pub sent: usize,
/// The amount of messages received on the channel, in aggregate.
pub received: usize,
/// How many times the caller blocked when sending messages.
pub blocked: usize,
/// Time of flight in micro seconds (us)
pub tof: Vec<CoarseDuration>,
}
Expand All @@ -77,6 +82,7 @@ impl Meter {
Readout {
sent: self.sent.load(Ordering::Relaxed),
received: self.received.load(Ordering::Relaxed),
blocked: self.blocked.load(Ordering::Relaxed),
tof: {
let mut acc = Vec::with_capacity(self.tof.len());
while let Some(value) = self.tof.pop() {
Expand All @@ -99,6 +105,10 @@ impl Meter {
self.received.fetch_add(1, Ordering::Relaxed);
}

fn note_blocked(&self) {
self.blocked.fetch_add(1, Ordering::Relaxed);
}

fn note_time_of_flight(&self, tof: CoarseDuration) {
let _ = self.tof.force_push(tof);
}
Expand Down
25 changes: 23 additions & 2 deletions node/metered-channel/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ fn try_send_try_next() {
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 1, .. });
rx.try_next().unwrap();
rx.try_next().unwrap();
assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, tof } => {
assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, blocked: 0, tof } => {
// every second in test, consumed before
assert_eq!(dbg!(tof).len(), 1);
});
rx.try_next().unwrap();
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, tof } => {
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, blocked: 0, tof } => {
// every second in test, consumed before
assert_eq!(dbg!(tof).len(), 0);
});
Expand Down Expand Up @@ -127,3 +127,24 @@ fn failed_send_does_not_inc_sent() {
assert_matches!(unbounded.meter().read(), Readout { sent: 0, received: 0, .. });
});
}

#[test]
fn blocked_send_is_metered() {
let (mut bounded_sender, mut bounded_receiver) = channel::<Msg>(1);

block_on(async move {
assert!(bounded_sender.send(Msg::default()).await.is_ok());
assert!(bounded_sender.send(Msg::default()).await.is_ok());
assert!(bounded_sender.try_send(Msg::default()).is_err());

assert_matches!(
bounded_sender.meter().read(),
Readout { sent: 2, received: 0, blocked: 1, .. }
);
bounded_receiver.try_next().unwrap();
assert_matches!(
bounded_receiver.meter().read(),
Readout { sent: 2, received: 1, blocked: 1, .. }
);
});
}
16 changes: 16 additions & 0 deletions node/overseer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct MetricsInner {
to_subsystem_bounded_tof: prometheus::HistogramVec,
to_subsystem_bounded_sent: prometheus::GaugeVec<prometheus::U64>,
to_subsystem_bounded_received: prometheus::GaugeVec<prometheus::U64>,
to_subsystem_bounded_blocked: prometheus::GaugeVec<prometheus::U64>,

to_subsystem_unbounded_tof: prometheus::HistogramVec,
to_subsystem_unbounded_sent: prometheus::GaugeVec<prometheus::U64>,
Expand Down Expand Up @@ -91,6 +92,11 @@ impl Metrics {
.with_label_values(&[name])
.set(readouts.bounded.received as u64);

metrics
.to_subsystem_bounded_blocked
.with_label_values(&[name])
.set(readouts.bounded.blocked as u64);

metrics
.to_subsystem_unbounded_sent
.with_label_values(&[name])
Expand Down Expand Up @@ -180,6 +186,16 @@ impl MetricsTrait for Metrics {
)?,
registry,
)?,
to_subsystem_bounded_blocked: prometheus::register(
prometheus::GaugeVec::<prometheus::U64>::new(
prometheus::Opts::new(
"polkadot_parachain_subsystem_bounded_blocked",
"Number of times senders blocked while sending messages to a subsystem",
),
&["subsystem_name"],
)?,
registry,
)?,
to_subsystem_unbounded_tof: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
Expand Down

0 comments on commit b79dccc

Please sign in to comment.