Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: introduce backpressure from consensus to the networking layer by using bounded channels #2340

Merged
merged 32 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions rs/http_endpoints/fuzz/fuzz_targets/execute_call_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@ fn new_call_service(
let ingress_pool_throttler = MockIngressPoolThrottler::new(throttler_rx);

let ingress_throttler = Arc::new(RwLock::new(ingress_pool_throttler));
#[allow(clippy::disallowed_methods)]
let (ingress_tx, _ingress_rx) = tokio::sync::mpsc::unbounded_channel();
let (ingress_tx, _ingress_rx) = tokio::sync::mpsc::channel(100);

let sig_verifier = Arc::new(temp_crypto_component_with_fake_registry(node_test_id(1)));
let call_handler = IngressValidatorBuilder::builder(
Expand Down
13 changes: 7 additions & 6 deletions rs/http_endpoints/public/src/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use ic_validator::HttpRequestVerifier;
use std::convert::TryInto;
use std::sync::Mutex;
use std::sync::{Arc, RwLock};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::Sender;
use tower::ServiceExt;

pub struct IngressValidatorBuilder {
Expand All @@ -46,7 +46,7 @@ pub struct IngressValidatorBuilder {
registry_client: Arc<dyn RegistryClient>,
ingress_filter: Arc<Mutex<IngressFilterService>>,
ingress_throttler: Arc<RwLock<dyn IngressPoolThrottler + Send + Sync>>,
ingress_tx: UnboundedSender<UnvalidatedArtifactMutation<SignedIngress>>,
ingress_tx: Sender<UnvalidatedArtifactMutation<SignedIngress>>,
}

impl IngressValidatorBuilder {
Expand All @@ -58,7 +58,7 @@ impl IngressValidatorBuilder {
ingress_verifier: Arc<dyn IngressSigVerifier + Send + Sync>,
ingress_filter: Arc<Mutex<IngressFilterService>>,
ingress_throttler: Arc<RwLock<dyn IngressPoolThrottler + Send + Sync>>,
ingress_tx: UnboundedSender<UnvalidatedArtifactMutation<SignedIngress>>,
ingress_tx: Sender<UnvalidatedArtifactMutation<SignedIngress>>,
) -> Self {
Self {
log,
Expand Down Expand Up @@ -167,7 +167,7 @@ pub struct IngressValidator {
validator: Arc<dyn HttpRequestVerifier<SignedIngressContent, RegistryRootOfTrustProvider>>,
ingress_filter: Arc<Mutex<IngressFilterService>>,
ingress_throttler: Arc<RwLock<dyn IngressPoolThrottler + Send + Sync>>,
ingress_tx: UnboundedSender<UnvalidatedArtifactMutation<SignedIngress>>,
ingress_tx: Sender<UnvalidatedArtifactMutation<SignedIngress>>,
}

impl IngressValidator {
Expand Down Expand Up @@ -276,7 +276,7 @@ impl IngressValidator {
}

pub struct IngressMessageSubmitter {
ingress_tx: UnboundedSender<UnvalidatedArtifactMutation<SignedIngress>>,
ingress_tx: Sender<UnvalidatedArtifactMutation<SignedIngress>>,
node_id: NodeId,
message: SignedIngress,
}
Expand All @@ -289,7 +289,7 @@ impl IngressMessageSubmitter {

/// Attempts to submit the ingress message to the ingress pool.
/// An [`HttpError`] is returned if P2P is not running.
pub(crate) fn try_submit(self) -> Result<(), HttpError> {
rumenov marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) async fn submit(self) -> Result<(), HttpError> {
let Self {
ingress_tx,
node_id,
Expand All @@ -300,6 +300,7 @@ impl IngressMessageSubmitter {
// no receiver for the ingress message.
let send_ingress_to_p2p_failed = ingress_tx
.send(UnvalidatedArtifactMutation::Insert((message, node_id)))
.await
.is_err();

if send_ingress_to_p2p_failed {
Expand Down
2 changes: 1 addition & 1 deletion rs/http_endpoints/public/src/call/call_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub(super) async fn handler(

let message_id = ingress_submitter.message_id();

ingress_submitter.try_submit()?;
ingress_submitter.submit().await?;

// We spawn a task to register the certification time of the message.
// The subscriber in the spawned task records the certification time of the message
Expand Down
2 changes: 1 addition & 1 deletion rs/http_endpoints/public/src/call/call_v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ async fn call_sync_v3(
}
};

let ingres_submission = ingress_submitter.try_submit();
let ingres_submission = ingress_submitter.submit().await;

if let Err(ingress_submission) = ingres_submission {
return CallV3Response::HttpError(ingress_submission);
Expand Down
4 changes: 2 additions & 2 deletions rs/http_endpoints/public/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
sync::{
mpsc::{Receiver, UnboundedSender},
mpsc::{Receiver, Sender},
watch, OnceCell,
},
time::{sleep, timeout, Instant},
Expand Down Expand Up @@ -283,7 +283,7 @@ pub fn start_server(
ingress_filter: IngressFilterService,
query_execution_service: QueryExecutionService,
ingress_throttler: Arc<RwLock<dyn IngressPoolThrottler + Send + Sync>>,
ingress_tx: UnboundedSender<UnvalidatedArtifactMutation<SignedIngress>>,
ingress_tx: Sender<UnvalidatedArtifactMutation<SignedIngress>>,
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
query_signer: Arc<dyn BasicSigner<QueryResponseHash> + Send + Sync>,
registry_client: Arc<dyn RegistryClient>,
Expand Down
6 changes: 3 additions & 3 deletions rs/http_endpoints/public/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use std::{collections::BTreeMap, convert::Infallible, net::SocketAddr, sync::Arc
use tokio::{
net::{TcpSocket, TcpStream},
sync::{
mpsc::{channel, unbounded_channel, Sender, UnboundedReceiver},
mpsc::{channel, Receiver, Sender},
watch, OnceCell,
},
};
Expand Down Expand Up @@ -365,7 +365,7 @@ mock! {

pub struct HttpEndpointHandles {
pub ingress_filter: IngressFilterHandle,
pub ingress_rx: UnboundedReceiver<UnvalidatedArtifactMutation<SignedIngress>>,
pub ingress_rx: Receiver<UnvalidatedArtifactMutation<SignedIngress>>,
pub query_execution: QueryExecutionHandle,
pub terminal_state_ingress_messages: Sender<(MessageId, Height)>,
pub certified_height_watcher: watch::Sender<Height>,
Expand Down Expand Up @@ -468,7 +468,7 @@ impl HttpEndpointBuilder {
let crypto = Arc::new(CryptoReturningOk::default());

#[allow(clippy::disallowed_methods)]
let (ingress_tx, ingress_rx) = unbounded_channel();
let (ingress_tx, ingress_rx) = channel(1000);

let log = no_op_logger();

Expand Down
21 changes: 8 additions & 13 deletions rs/p2p/artifact_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ use std::{
time::Duration,
};
use tokio::{
sync::mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
sync::mpsc::{channel, Receiver, Sender},
time::timeout,
};
use tracing::instrument;

type ArtifactEventSender<Artifact> = UnboundedSender<UnvalidatedArtifactMutation<Artifact>>;
type ArtifactEventSender<Artifact> = Sender<UnvalidatedArtifactMutation<Artifact>>;

/// Metrics for a client artifact processor.
struct ArtifactProcessorMetrics {
Expand Down Expand Up @@ -139,14 +139,9 @@ pub fn run_artifact_processor<Artifact: IdentifiableArtifact>(
send_advert: Sender<ArtifactTransmit<Artifact>>,
initial_artifacts: Vec<Artifact>,
) -> (Box<dyn JoinGuard>, ArtifactEventSender<Artifact>) {
// Making this channel bounded can be problematic since we don't have true multiplexing
// of P2P messages.
// Possible scenario is - adverts+chunks arrive on the same channel, slow consensus
// will result on slow consuption of chunks. Slow consumption of chunks will in turn
// result in slower consumptions of adverts. Ideally adverts are consumed at rate
// independent of consensus.
#[allow(clippy::disallowed_methods)]
let (sender, receiver) = unbounded_channel();
// We picked fairly big value here so effectively the channel is unbounded so no wakers run.
// The number should be the same order of magnitute as the max number of validated artifacts (e.g. the max slot table size)
let (sender, receiver) = channel(50000);
rumenov marked this conversation as resolved.
Show resolved Hide resolved
let shutdown = Arc::new(AtomicBool::new(false));

// Spawn the processor thread
Expand Down Expand Up @@ -182,7 +177,7 @@ fn process_messages<Artifact: IdentifiableArtifact + 'static>(
time_source: Arc<dyn TimeSource>,
client: Box<dyn ArtifactProcessor<Artifact>>,
send_advert: Sender<ArtifactTransmit<Artifact>>,
mut receiver: UnboundedReceiver<UnvalidatedArtifactMutation<Artifact>>,
mut receiver: Receiver<UnvalidatedArtifactMutation<Artifact>>,
mut metrics: ArtifactProcessorMetrics,
shutdown: Arc<AtomicBool>,
) {
Expand Down Expand Up @@ -255,7 +250,7 @@ pub fn create_ingress_handlers<
>,
metrics_registry: MetricsRegistry,
) -> (
UnboundedSender<UnvalidatedArtifactMutation<SignedIngress>>,
Sender<UnvalidatedArtifactMutation<SignedIngress>>,
Box<dyn JoinGuard>,
) {
let client = IngressProcessor::new(ingress_pool.clone(), ingress_handler);
Expand All @@ -281,7 +276,7 @@ pub fn create_artifact_handler<
pool: Arc<RwLock<Pool>>,
metrics_registry: MetricsRegistry,
) -> (
UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
Sender<UnvalidatedArtifactMutation<Artifact>>,
Box<dyn JoinGuard>,
) {
let inital_artifacts: Vec<_> = pool.read().unwrap().get_all_for_broadcast().collect();
Expand Down
6 changes: 3 additions & 3 deletions rs/p2p/consensus_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use phantom_newtype::AmountOf;
use tokio::{
runtime::Handle,
sync::{
mpsc::{Receiver, UnboundedSender},
mpsc::{Receiver, Sender},
watch,
},
};
Expand Down Expand Up @@ -55,7 +55,7 @@ impl ConsensusManagerBuilder {
>(
&mut self,
outbound_artifacts_rx: Receiver<ArtifactTransmit<Artifact>>,
inbound_artifacts_tx: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
inbound_artifacts_tx: Sender<UnvalidatedArtifactMutation<Artifact>>,
(assembler, assembler_router): (F, Router),
slot_limit: usize,
) {
Expand Down Expand Up @@ -119,7 +119,7 @@ fn start_consensus_manager<Artifact, WireArtifact, Assembler>(
adverts_to_send: Receiver<ArtifactTransmit<Artifact>>,
// Adverts received from peers
adverts_received: Receiver<(SlotUpdate<WireArtifact>, NodeId, ConnId)>,
sender: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
sender: Sender<UnvalidatedArtifactMutation<Artifact>>,
assembler: Assembler,
transport: Arc<dyn Transport>,
topology_watcher: watch::Receiver<SubnetTopology>,
Expand Down
16 changes: 8 additions & 8 deletions rs/p2p/consensus_manager/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::{
runtime::Handle,
select,
sync::{
mpsc::{Receiver, Sender, UnboundedSender},
mpsc::{Receiver, Sender},
watch,
},
task::JoinSet,
Expand Down Expand Up @@ -187,7 +187,7 @@ pub(crate) struct ConsensusManagerReceiver<

// Receive side:
adverts_received: Receiver<ReceivedAdvert>,
sender: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
sender: Sender<UnvalidatedArtifactMutation<Artifact>>,
artifact_assembler: Assembler,

slot_table: HashMap<NodeId, HashMap<SlotNumber, SlotEntry<WireArtifact::Id>>>,
Expand Down Expand Up @@ -220,7 +220,7 @@ where
rt_handle: Handle,
adverts_received: Receiver<(SlotUpdate<WireArtifact>, NodeId, ConnId)>,
artifact_assembler: Assembler,
sender: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
sender: Sender<UnvalidatedArtifactMutation<Artifact>>,
topology_watcher: watch::Receiver<SubnetTopology>,
slot_limit: usize,
) -> Shutdown {
Expand Down Expand Up @@ -462,7 +462,7 @@ where
// Only first peer for specific artifact ID is considered for push
mut artifact: Option<(WireArtifact, NodeId)>,
mut peer_rx: watch::Receiver<PeerCounter>,
sender: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
sender: Sender<UnvalidatedArtifactMutation<Artifact>>,
mut artifact_assembler: Assembler,
metrics: ConsensusManagerMetrics,
cancellation_token: CancellationToken,
Expand Down Expand Up @@ -606,7 +606,7 @@ mod tests {
use ic_test_utilities_logger::with_test_replica_logger;
use ic_types::{artifact::IdentifiableArtifact, RegistryVersion};
use ic_types_test_utils::ids::{NODE_1, NODE_2};
use tokio::{sync::mpsc::UnboundedReceiver, time::timeout};
use tokio::time::timeout;
use tower::util::ServiceExt;

use super::*;
Expand All @@ -616,7 +616,7 @@ mod tests {
struct ReceiverManagerBuilder {
// Adverts received from peers
adverts_received: Receiver<(SlotUpdate<U64Artifact>, NodeId, ConnId)>,
sender: UnboundedSender<UnvalidatedArtifactMutation<U64Artifact>>,
sender: Sender<UnvalidatedArtifactMutation<U64Artifact>>,
artifact_assembler: MockArtifactAssembler,
topology_watcher: watch::Receiver<SubnetTopology>,
slot_limit: usize,
Expand All @@ -632,7 +632,7 @@ mod tests {
>;

struct Channels {
unvalidated_artifact_receiver: UnboundedReceiver<UnvalidatedArtifactMutation<U64Artifact>>,
unvalidated_artifact_receiver: Receiver<UnvalidatedArtifactMutation<U64Artifact>>,
}

impl ReceiverManagerBuilder {
Expand All @@ -648,7 +648,7 @@ mod tests {

fn new() -> Self {
let (_, adverts_received) = tokio::sync::mpsc::channel(100);
let (sender, unvalidated_artifact_receiver) = tokio::sync::mpsc::unbounded_channel();
let (sender, unvalidated_artifact_receiver) = tokio::sync::mpsc::channel(1000);
let (_, topology_watcher) = watch::channel(SubnetTopology::default());
let artifact_assembler =
Self::make_mock_artifact_assembler_with_clone(MockArtifactAssembler::default);
Expand Down
2 changes: 1 addition & 1 deletion rs/p2p/test_utils/src/turmoil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ pub fn start_test_processor(
) -> (
Box<dyn JoinGuard>,
mpsc::Receiver<ArtifactTransmit<U64Artifact>>,
mpsc::UnboundedSender<UnvalidatedArtifactMutation<U64Artifact>>,
mpsc::Sender<UnvalidatedArtifactMutation<U64Artifact>>,
) {
let (tx, rx) = tokio::sync::mpsc::channel(1000);
let time_source = Arc::new(SysTimeSource::new());
Expand Down
4 changes: 1 addition & 3 deletions rs/pocket_ic_server/src/pocket_ic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1671,9 +1671,7 @@ impl Operation for CallRequest {
Err(e) => OpOut::Error(PocketIcError::RequestRoutingError(e)),
Ok(subnet) => {
let node = &subnet.nodes[0];
#[allow(clippy::disallowed_methods)]
let (s, mut r) =
mpsc::unbounded_channel::<UnvalidatedArtifactMutation<SignedIngress>>();
let (s, mut r) = mpsc::channel::<UnvalidatedArtifactMutation<SignedIngress>>(1);
let ingress_filter = subnet.ingress_filter.clone();

let ingress_validator = IngressValidatorBuilder::builder(
Expand Down
6 changes: 3 additions & 3 deletions rs/replica/setup_ic_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use std::{
str::FromStr,
sync::{Arc, Mutex, RwLock},
};
use tokio::sync::{mpsc::UnboundedSender, watch};
use tokio::sync::{mpsc::Sender, watch};
use tower_http::trace::TraceLayer;

/// [IC-1718]: Whether the `hashes-in-blocks` feature is enabled. If the flag is set to `true`, we
Expand Down Expand Up @@ -123,7 +123,7 @@ pub fn setup_consensus_and_p2p(
max_certified_height_tx: watch::Sender<Height>,
) -> (
Arc<RwLock<IngressPoolImpl>>,
UnboundedSender<UnvalidatedArtifactMutation<SignedIngress>>,
Sender<UnvalidatedArtifactMutation<SignedIngress>>,
Vec<Box<dyn JoinGuard>>,
) {
let consensus_pool_cache = consensus_pool.read().unwrap().get_cache();
Expand Down Expand Up @@ -244,7 +244,7 @@ fn start_consensus(
max_certified_height_tx: watch::Sender<Height>,
) -> (
Arc<RwLock<IngressPoolImpl>>,
UnboundedSender<UnvalidatedArtifactMutation<SignedIngress>>,
Sender<UnvalidatedArtifactMutation<SignedIngress>>,
Vec<Box<dyn JoinGuard>>,
ConsensusManagerBuilder,
) {
Expand Down
4 changes: 2 additions & 2 deletions rs/replica/src/setup_ic_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use ic_types::{
use ic_xnet_payload_builder::XNetPayloadBuilderImpl;
use std::sync::{Arc, RwLock};
use tokio::sync::{
mpsc::{channel, UnboundedSender},
mpsc::{channel, Sender},
watch, OnceCell,
};

Expand Down Expand Up @@ -75,7 +75,7 @@ pub fn construct_ic_stack(
// TODO: remove next three return values since they are used only in tests
Arc<dyn StateReader<State = ReplicatedState>>,
QueryExecutionService,
UnboundedSender<UnvalidatedArtifactMutation<SignedIngress>>,
Sender<UnvalidatedArtifactMutation<SignedIngress>>,
Vec<Box<dyn JoinGuard>>,
XNetEndpoint,
)> {
Expand Down
Loading
Loading