Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

fix: Make metered channel accounting take unused dropped Permits into account #849

Merged
merged 3 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification, Sequen
// Re-export SingleExecutor as a convenience adapter.
pub use crate::core::SingleExecutor;

/// Default inter-task channel size.
pub const DEFAULT_CHANNEL_SIZE: usize = 1_000;

/// Convenience type representing a serialized transaction.
pub type SerializedTransaction = Vec<u8>;

Expand Down Expand Up @@ -130,7 +127,7 @@ impl Executor {
let metrics = ExecutorMetrics::new(registry);

let (tx_executor, rx_executor) =
metered_channel::channel(DEFAULT_CHANNEL_SIZE, &metrics.tx_executor);
metered_channel::channel(primary::CHANNEL_CAPACITY, &metrics.tx_executor);

// Ensure there is a single consensus client modifying the execution state.
ensure!(
Expand Down
4 changes: 2 additions & 2 deletions test_utils/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arc_swap::ArcSwap;
use config::{Committee, Parameters, SharedCommittee, SharedWorkerCache, WorkerId};
use crypto::KeyPair;
use crypto::PublicKey;
use executor::{SerializedTransaction, SingleExecutor, SubscriberResult, DEFAULT_CHANNEL_SIZE};
use executor::{SerializedTransaction, SingleExecutor, SubscriberResult};
use fastcrypto::traits::KeyPair as _;
use itertools::Itertools;
use multiaddr::Multiaddr;
Expand Down Expand Up @@ -363,7 +363,7 @@ impl PrimaryNodeDetails {
.await
.unwrap();

let (tx, _) = tokio::sync::broadcast::channel(DEFAULT_CHANNEL_SIZE);
let (tx, _) = tokio::sync::broadcast::channel(primary::CHANNEL_CAPACITY);
let transactions_sender = tx.clone();
// spawn a task to listen on the committed transactions
// and translate to a mpmc channel
Expand Down
46 changes: 39 additions & 7 deletions types/src/metered_channel.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
#![allow(dead_code)] // TODO: complete tests - This kinda sorta facades the whole tokio::mpsc::{Sender, Receiver}: without tests, this will be fragile to maintain.
use futures::{FutureExt, Stream};
use futures::{FutureExt, Stream, TryFutureExt};
use prometheus::IntGauge;
use std::task::{Context, Poll};
use tokio::sync::mpsc::{
self,
error::{SendError, TryRecvError, TrySendError},
Permit,
};

/// An [`mpsc::Sender`](tokio::sync::mpsc::Sender) with an [`IntGauge`]
Expand Down Expand Up @@ -80,11 +79,44 @@ impl<T> Receiver<T> {

impl<T> Unpin for Receiver<T> {}

/// A newtype for an `mpsc::Permit` which allows us to inject gauge accounting
/// in the case the permit is dropped w/o sending
pub struct Permit<'a, T> {
permit: Option<mpsc::Permit<'a, T>>,
gauge_ref: &'a IntGauge,
}

impl<'a, T> Permit<'a, T> {
pub fn new(permit: mpsc::Permit<'a, T>, gauge_ref: &'a IntGauge) -> Permit<'a, T> {
Permit {
permit: Some(permit),
gauge_ref,
}
}

pub fn send(mut self, value: T) {
let sender = self.permit.take().expect("Permit invariant violated!");
sender.send(value);
// skip the drop logic, see https://github.com/tokio-rs/tokio/blob/a66884a2fb80d1180451706f3c3e006a3fdcb036/tokio/src/sync/mpsc/bounded.rs#L1155-L1163
std::mem::forget(self);
}
}

impl<'a, T> Drop for Permit<'a, T> {
fn drop(&mut self) {
// in the case the permit is dropped without sending, we still want to decrease the occupancy of the channel
self.gauge_ref.dec()
}
}

impl<T> Sender<T> {
/// Sends a value, waiting until there is capacity.
/// Increments the gauge in case of a successful `send`.
pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
self.inner.send(value).inspect(|_| self.gauge.inc()).await
self.inner
.send(value)
.inspect_ok(|_| self.gauge.inc())
.await
}

/// Completes when the receiver has dropped.
Expand Down Expand Up @@ -122,10 +154,10 @@ impl<T> Sender<T> {
.reserve()
// remove this unsightly hack once https://github.com/rust-lang/rust/issues/91345 is resolved
.map(|val| {
if val.is_ok() {
val.map(|permit| {
self.gauge.inc();
}
val
Permit::new(permit, &self.gauge)
})
})
.await
}
Expand All @@ -137,7 +169,7 @@ impl<T> Sender<T> {
self.inner.try_reserve().map(|val| {
// remove this unsightly hack once https://github.com/rust-lang/rust/issues/91345 is resolved
self.gauge.inc();
val
Permit::new(val, &self.gauge)
})
}

Expand Down
17 changes: 16 additions & 1 deletion types/tests/metered_channel_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,28 @@ fn test_reserve() {
let permit = block_on(tx.reserve()).unwrap();
assert_eq!(counter.get(), 1);

permit.send(42);
permit.send(item);
let received_item = block_on(rx.recv()).unwrap();

assert_eq!(received_item, item);
assert_eq!(counter.get(), 0);
}

#[test]
fn test_reserve_and_drop() {
let counter = IntGauge::new("TEST_COUNTER", "test").unwrap();
let (tx, _rx) = metered_channel::channel::<i32>(8, &counter);

assert_eq!(counter.get(), 0);

let permit = block_on(tx.reserve()).unwrap();
assert_eq!(counter.get(), 1);

std::mem::drop(permit);

assert_eq!(counter.get(), 0);
}

#[test]
fn test_send_backpressure() {
let waker = noop_waker();
Expand Down