Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add counter for bounded channel sends that block. #5490

Merged
merged 7 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions node/metered-channel/src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,23 @@ impl<T> MeteredSender<T> {
where
Self: Unpin,
{
let msg = self.prepare_with_tof(msg);
eskimor marked this conversation as resolved.
Show resolved Hide resolved
let fut = self.inner.send(msg);
futures::pin_mut!(fut);
fut.await.map_err(|e| {
self.meter.retract_sent();
e
})
match self.try_send(msg) {
Err(send_err) => {
if send_err.is_disconnected() {
sandreim marked this conversation as resolved.
Show resolved Hide resolved
return Err(send_err.into_send_error())
}

let msg = send_err.into_inner();
self.meter.note_sent();
let fut = self.inner.send(msg);
futures::pin_mut!(fut);
fut.await.map_err(|e| {
self.meter.retract_sent();
e
})
},
_ => Ok(()),
}
}

/// Attempt to send message or fail immediately.
Expand All @@ -174,6 +184,10 @@ impl<T> MeteredSender<T> {
) -> result::Result<(), mpsc::TrySendError<MaybeTimeOfFlight<T>>> {
let msg = self.prepare_with_tof(msg);
self.inner.try_send(msg).map_err(|e| {
if e.is_full() {
// Count bounded channel sends that block.
self.meter.note_blocked();
}
self.meter.retract_sent();
e
})
Expand Down
10 changes: 10 additions & 0 deletions node/metered-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct Meter {
sent: Arc<AtomicUsize>,
// Number of receives on this channel.
received: Arc<AtomicUsize>,
// Number of times senders blocked while sending messages to a subsystem.
blocked: Arc<AtomicUsize>,
// Atomic ringbuffer of the last 50 time of flight values
tof: Arc<crossbeam_queue::ArrayQueue<CoarseDuration>>,
}
Expand All @@ -51,6 +53,7 @@ impl std::default::Default for Meter {
Self {
sent: Arc::new(AtomicUsize::new(0)),
received: Arc::new(AtomicUsize::new(0)),
blocked: Arc::new(AtomicUsize::new(0)),
tof: Arc::new(crossbeam_queue::ArrayQueue::new(100)),
}
}
Expand All @@ -65,6 +68,8 @@ pub struct Readout {
pub sent: usize,
/// The amount of messages received on the channel, in aggregate.
pub received: usize,
/// How many times the caller blocked when sending messages.
pub blocked: usize,
/// Time of flight in micro seconds (us)
pub tof: Vec<CoarseDuration>,
}
Expand All @@ -77,6 +82,7 @@ impl Meter {
Readout {
sent: self.sent.load(Ordering::Relaxed),
received: self.received.load(Ordering::Relaxed),
blocked: self.blocked.load(Ordering::Relaxed),
tof: {
let mut acc = Vec::with_capacity(self.tof.len());
while let Some(value) = self.tof.pop() {
Expand All @@ -99,6 +105,10 @@ impl Meter {
self.received.fetch_add(1, Ordering::Relaxed);
}

fn note_blocked(&self) {
self.blocked.fetch_add(1, Ordering::Relaxed);
}

fn note_time_of_flight(&self, tof: CoarseDuration) {
let _ = self.tof.force_push(tof);
}
Expand Down
25 changes: 23 additions & 2 deletions node/metered-channel/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ fn try_send_try_next() {
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 1, .. });
rx.try_next().unwrap();
rx.try_next().unwrap();
assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, tof } => {
assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, blocked: 0, tof } => {
// every second in test, consumed before
assert_eq!(dbg!(tof).len(), 1);
});
rx.try_next().unwrap();
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, tof } => {
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, blocked: 0, tof } => {
// every second in test, consumed before
assert_eq!(dbg!(tof).len(), 0);
});
Expand Down Expand Up @@ -127,3 +127,24 @@ fn failed_send_does_not_inc_sent() {
assert_matches!(unbounded.meter().read(), Readout { sent: 0, received: 0, .. });
});
}

#[test]
fn blocked_send_is_metered() {
let (mut bounded_sender, mut bounded_receiver) = channel::<Msg>(1);

block_on(async move {
assert!(bounded_sender.send(Msg::default()).await.is_ok());
assert!(bounded_sender.send(Msg::default()).await.is_ok());
assert!(bounded_sender.try_send(Msg::default()).is_err());

assert_matches!(
bounded_sender.meter().read(),
Readout { sent: 2, received: 0, blocked: 1, .. }
);
bounded_receiver.try_next().unwrap();
assert_matches!(
bounded_receiver.meter().read(),
Readout { sent: 2, received: 1, blocked: 1, .. }
);
});
}
16 changes: 16 additions & 0 deletions node/overseer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct MetricsInner {
to_subsystem_bounded_tof: prometheus::HistogramVec,
to_subsystem_bounded_sent: prometheus::GaugeVec<prometheus::U64>,
to_subsystem_bounded_received: prometheus::GaugeVec<prometheus::U64>,
to_subsystem_bounded_blocked: prometheus::GaugeVec<prometheus::U64>,

to_subsystem_unbounded_tof: prometheus::HistogramVec,
to_subsystem_unbounded_sent: prometheus::GaugeVec<prometheus::U64>,
Expand Down Expand Up @@ -91,6 +92,11 @@ impl Metrics {
.with_label_values(&[name])
.set(readouts.bounded.received as u64);

metrics
.to_subsystem_bounded_blocked
.with_label_values(&[name])
.set(readouts.bounded.blocked as u64);

metrics
.to_subsystem_unbounded_sent
.with_label_values(&[name])
Expand Down Expand Up @@ -180,6 +186,16 @@ impl MetricsTrait for Metrics {
)?,
registry,
)?,
to_subsystem_bounded_blocked: prometheus::register(
prometheus::GaugeVec::<prometheus::U64>::new(
prometheus::Opts::new(
"polkadot_parachain_subsystem_bounded_blocked",
"Number of times senders blocked while sending messages to a subsystem",
),
&["subsystem_name"],
)?,
registry,
)?,
to_subsystem_unbounded_tof: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
Expand Down