diff --git a/applications/tari_base_node/src/commands/command/status.rs b/applications/tari_base_node/src/commands/command/status.rs index b6a4cb32b5..84a0c389be 100644 --- a/applications/tari_base_node/src/commands/command/status.rs +++ b/applications/tari_base_node/src/commands/command/status.rs @@ -131,7 +131,7 @@ impl CommandContext { ), ); - match self.comms.listening_info().liveness_status() { + match self.comms.liveness_status() { LivenessStatus::Disabled => {}, LivenessStatus::Checking => { status_line.add("⏳️️"); diff --git a/comms/core/src/builder/comms_node.rs b/comms/core/src/builder/comms_node.rs index 1279e97d1b..c7d21db50e 100644 --- a/comms/core/src/builder/comms_node.rs +++ b/comms/core/src/builder/comms_node.rs @@ -26,7 +26,7 @@ use log::*; use tari_shutdown::ShutdownSignal; use tokio::{ io::{AsyncRead, AsyncWrite}, - sync::{broadcast, mpsc}, + sync::{broadcast, mpsc, watch}, }; use super::{CommsBuilderError, CommsShutdown}; @@ -37,6 +37,8 @@ use crate::{ ConnectionManagerRequest, ConnectionManagerRequester, ListenerInfo, + LivenessCheck, + LivenessStatus, }, connectivity::{ConnectivityEventRx, ConnectivityManager, ConnectivityRequest, ConnectivityRequester}, multiaddr::Multiaddr, @@ -132,6 +134,7 @@ impl UnspawnedCommsNode { } /// Spawn a new node using the specified [Transport](crate::transports::Transport). + #[allow(clippy::too_many_lines)] pub async fn spawn_with_transport(self, transport: TTransport) -> Result where TTransport: Transport + Unpin + Send + Sync + Clone + 'static, @@ -187,8 +190,8 @@ impl UnspawnedCommsNode { let noise_config = NoiseConfig::new(node_identity.clone()); let mut connection_manager = ConnectionManager::new( - connection_manager_config, - transport, + connection_manager_config.clone(), + transport.clone(), noise_config, dial_backoff, connection_manager_request_rx, @@ -235,6 +238,19 @@ impl UnspawnedCommsNode { node_identity.public_address() ); + // Spawn liveness check now that we have the final address + let liveness_watch = connection_manager_config + .liveness_self_check_interval + .map(|interval| { + LivenessCheck::spawn( + transport, + node_identity.public_address(), + interval, + shutdown_signal.clone(), + ) + }) + .unwrap_or_else(|| watch::channel(LivenessStatus::Disabled).1); + Ok(CommsNode { shutdown_signal, connection_manager_requester, @@ -242,6 +258,7 @@ impl UnspawnedCommsNode { listening_info, node_identity, peer_manager, + liveness_watch, hidden_service, complete_signals: ext_context.drain_complete_signals(), }) @@ -286,6 +303,8 @@ pub struct CommsNode { peer_manager: Arc, /// The bind addresses of the listener(s) listening_info: ListenerInfo, + /// Current liveness status + liveness_watch: watch::Receiver, /// `Some` if the comms node is configured to run via a hidden service, otherwise `None` hidden_service: Option, /// The 'reciprocal' shutdown signals for each comms service @@ -328,6 +347,11 @@ impl CommsNode { &self.listening_info } + /// Returns the current liveness status + pub fn liveness_status(&self) -> LivenessStatus { + *self.liveness_watch.borrow() + } + /// Return the Ip/Tcp address that this node is listening on pub fn hidden_service(&self) -> Option<&tor::HiddenService> { self.hidden_service.as_ref() diff --git a/comms/core/src/connection_manager/listener.rs b/comms/core/src/connection_manager/listener.rs index 3df50b8696..15e4a89104 100644 --- a/comms/core/src/connection_manager/listener.rs +++ b/comms/core/src/connection_manager/listener.rs @@ -36,7 +36,7 @@ use log::*; use tari_shutdown::{oneshot_trigger, oneshot_trigger::OneshotTrigger, ShutdownSignal}; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, - sync::{mpsc, watch}, + sync::mpsc, time, }; use tokio_stream::StreamExt; @@ -53,7 +53,7 @@ use super::{ use crate::{ bounded_executor::BoundedExecutor, connection_manager::{ - liveness::{LivenessCheck, LivenessSession, LivenessStatus}, + liveness::LivenessSession, metrics, wire_mode::{WireMode, LIVENESS_WIRE_MODE}, }, @@ -83,7 +83,7 @@ pub struct PeerListener { node_identity: Arc, our_supported_protocols: Vec, liveness_session_count: Arc, - on_listening: OneshotTrigger), ConnectionManagerError>>, + on_listening: OneshotTrigger>, } impl PeerListener @@ -121,10 +121,7 @@ where /// in binding the listener socket // This returns an impl Future and is not async because we want to exclude &self from the future so that it has a // 'static lifetime as well as to flatten the oneshot result for ergonomics - pub fn on_listening( - &self, - ) -> impl Future), ConnectionManagerError>> + 'static - { + pub fn on_listening(&self) -> impl Future> + 'static { let signal = self.on_listening.to_signal(); signal.map(|r| r.ok_or(ConnectionManagerError::ListenerOneshotCancelled)?) } @@ -135,7 +132,7 @@ where self } - pub async fn listen(self) -> Result<(Multiaddr, watch::Receiver), ConnectionManagerError> { + pub async fn listen(self) -> Result { let on_listening = self.on_listening(); runtime::current().spawn(self.run()); on_listening.await @@ -148,9 +145,7 @@ where Ok((mut inbound, address)) => { info!(target: LOG_TARGET, "Listening for peer connections on '{}'", address); - let liveness_watch = self.spawn_liveness_check(); - - self.on_listening.broadcast(Ok((address, liveness_watch))); + self.on_listening.broadcast(Ok(address)); loop { tokio::select! { @@ -234,21 +229,6 @@ where }); } - fn spawn_liveness_check(&self) -> watch::Receiver { - match self.config.liveness_self_check_interval { - Some(interval) => LivenessCheck::spawn( - self.transport.clone(), - self.node_identity.public_address(), - interval, - self.shutdown_signal.clone(), - ), - None => { - let (_, rx) = watch::channel(LivenessStatus::Disabled); - rx - }, - } - } - async fn spawn_listen_task(&self, mut socket: TTransport::Output, peer_addr: Multiaddr) { let node_identity = self.node_identity.clone(); let peer_manager = self.peer_manager.clone(); diff --git a/comms/core/src/connection_manager/manager.rs b/comms/core/src/connection_manager/manager.rs index 6e492ff187..4af961713d 100644 --- a/comms/core/src/connection_manager/manager.rs +++ b/comms/core/src/connection_manager/manager.rs @@ -28,7 +28,7 @@ use tari_shutdown::{Shutdown, ShutdownSignal}; use time::Duration; use tokio::{ io::{AsyncRead, AsyncWrite}, - sync::{broadcast, mpsc, oneshot, watch}, + sync::{broadcast, mpsc, oneshot}, task, time, }; @@ -43,7 +43,7 @@ use super::{ }; use crate::{ backoff::Backoff, - connection_manager::{liveness::LivenessStatus, metrics, ConnectionDirection, ConnectionId}, + connection_manager::{metrics, ConnectionDirection, ConnectionId}, multiplexing::Substream, noise::NoiseConfig, peer_manager::{NodeId, NodeIdentity, PeerManagerError}, @@ -149,7 +149,6 @@ impl Default for ConnectionManagerConfig { pub struct ListenerInfo { bind_address: Multiaddr, aux_bind_address: Option, - liveness_watch: watch::Receiver, } impl ListenerInfo { @@ -163,17 +162,6 @@ impl ListenerInfo { pub fn auxiliary_bind_address(&self) -> Option<&Multiaddr> { self.aux_bind_address.as_ref() } - - /// Returns the current liveness status - pub fn liveness_status(&self) -> LivenessStatus { - *self.liveness_watch.borrow() - } - - /// Waits for liveness status to change from the last time the value was checked. - pub async fn liveness_status_changed(&mut self) -> Option { - self.liveness_watch.changed().await.ok()?; - Some(*self.liveness_watch.borrow()) - } } /// The actor responsible for connection management. @@ -346,17 +334,16 @@ where listener.set_supported_protocols(self.protocols.get_supported_protocols()); let mut listener_info = match listener.listen().await { - Ok((bind_address, liveness_watch)) => ListenerInfo { + Ok(bind_address) => ListenerInfo { bind_address, aux_bind_address: None, - liveness_watch, }, Err(err) => return Err(err), }; if let Some(mut listener) = self.aux_listener.take() { listener.set_supported_protocols(self.protocols.get_supported_protocols()); - let (addr, _) = listener.listen().await?; + let addr = listener.listen().await?; debug!(target: LOG_TARGET, "Aux TCP listener bound to address {}", addr); listener_info.aux_bind_address = Some(addr); } diff --git a/comms/core/src/connection_manager/mod.rs b/comms/core/src/connection_manager/mod.rs index 6ae616669a..82f3e981a2 100644 --- a/comms/core/src/connection_manager/mod.rs +++ b/comms/core/src/connection_manager/mod.rs @@ -52,6 +52,7 @@ mod peer_connection; pub use peer_connection::{ConnectionId, NegotiatedSubstream, PeerConnection, PeerConnectionRequest}; mod liveness; +pub(crate) use liveness::LivenessCheck; pub use liveness::LivenessStatus; mod wire_mode; diff --git a/comms/core/src/connection_manager/tests/listener_dialer.rs b/comms/core/src/connection_manager/tests/listener_dialer.rs index c2b71380bb..5e0b26a92a 100644 --- a/comms/core/src/connection_manager/tests/listener_dialer.rs +++ b/comms/core/src/connection_manager/tests/listener_dialer.rs @@ -66,7 +66,7 @@ async fn listen() -> Result<(), Box> { shutdown.to_signal(), ); - let (mut bind_addr, _) = listener.listen().await?; + let mut bind_addr = listener.listen().await?; unpack_enum!(Protocol::Memory(port) = bind_addr.pop().unwrap()); assert!(port > 0); @@ -103,7 +103,7 @@ async fn smoke() { listener.set_supported_protocols(supported_protocols.clone()); // Get the listening address of the peer - let (address, _) = listener.listen().await.unwrap(); + let address = listener.listen().await.unwrap(); let node_identity2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); let noise_config2 = NoiseConfig::new(node_identity2.clone()); @@ -207,7 +207,7 @@ async fn banned() { listener.set_supported_protocols(supported_protocols.clone()); // Get the listener address of the peer - let (address, _) = listener.listen().await.unwrap(); + let address = listener.listen().await.unwrap(); let node_identity2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); // The listener has banned the dialer peer