Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(comms): spawn liveness check after address is final #4919

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion applications/tari_base_node/src/commands/command/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl CommandContext {
),
);

match self.comms.listening_info().liveness_status() {
match self.comms.liveness_status() {
LivenessStatus::Disabled => {},
LivenessStatus::Checking => {
status_line.add("⏳️️");
Expand Down
30 changes: 27 additions & 3 deletions comms/core/src/builder/comms_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use log::*;
use tari_shutdown::ShutdownSignal;
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{broadcast, mpsc},
sync::{broadcast, mpsc, watch},
};

use super::{CommsBuilderError, CommsShutdown};
Expand All @@ -37,6 +37,8 @@ use crate::{
ConnectionManagerRequest,
ConnectionManagerRequester,
ListenerInfo,
LivenessCheck,
LivenessStatus,
},
connectivity::{ConnectivityEventRx, ConnectivityManager, ConnectivityRequest, ConnectivityRequester},
multiaddr::Multiaddr,
Expand Down Expand Up @@ -132,6 +134,7 @@ impl UnspawnedCommsNode {
}

/// Spawn a new node using the specified [Transport](crate::transports::Transport).
#[allow(clippy::too_many_lines)]
pub async fn spawn_with_transport<TTransport>(self, transport: TTransport) -> Result<CommsNode, CommsBuilderError>
where
TTransport: Transport + Unpin + Send + Sync + Clone + 'static,
Expand Down Expand Up @@ -187,8 +190,8 @@ impl UnspawnedCommsNode {
let noise_config = NoiseConfig::new(node_identity.clone());

let mut connection_manager = ConnectionManager::new(
connection_manager_config,
transport,
connection_manager_config.clone(),
transport.clone(),
noise_config,
dial_backoff,
connection_manager_request_rx,
Expand Down Expand Up @@ -235,13 +238,27 @@ impl UnspawnedCommsNode {
node_identity.public_address()
);

// Spawn liveness check now that we have the final address
let liveness_watch = connection_manager_config
.liveness_self_check_interval
.map(|interval| {
LivenessCheck::spawn(
transport,
node_identity.public_address(),
interval,
shutdown_signal.clone(),
)
})
.unwrap_or_else(|| watch::channel(LivenessStatus::Disabled).1);

Ok(CommsNode {
shutdown_signal,
connection_manager_requester,
connectivity_requester,
listening_info,
node_identity,
peer_manager,
liveness_watch,
hidden_service,
complete_signals: ext_context.drain_complete_signals(),
})
Expand Down Expand Up @@ -286,6 +303,8 @@ pub struct CommsNode {
peer_manager: Arc<PeerManager>,
/// The bind addresses of the listener(s)
listening_info: ListenerInfo,
/// Current liveness status
liveness_watch: watch::Receiver<LivenessStatus>,
/// `Some` if the comms node is configured to run via a hidden service, otherwise `None`
hidden_service: Option<tor::HiddenService>,
/// The 'reciprocal' shutdown signals for each comms service
Expand Down Expand Up @@ -328,6 +347,11 @@ impl CommsNode {
&self.listening_info
}

/// Returns the current liveness status
pub fn liveness_status(&self) -> LivenessStatus {
*self.liveness_watch.borrow()
}

/// Return the Ip/Tcp address that this node is listening on
pub fn hidden_service(&self) -> Option<&tor::HiddenService> {
self.hidden_service.as_ref()
Expand Down
32 changes: 6 additions & 26 deletions comms/core/src/connection_manager/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use log::*;
use tari_shutdown::{oneshot_trigger, oneshot_trigger::OneshotTrigger, ShutdownSignal};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
sync::{mpsc, watch},
sync::mpsc,
time,
};
use tokio_stream::StreamExt;
Expand All @@ -53,7 +53,7 @@ use super::{
use crate::{
bounded_executor::BoundedExecutor,
connection_manager::{
liveness::{LivenessCheck, LivenessSession, LivenessStatus},
liveness::LivenessSession,
metrics,
wire_mode::{WireMode, LIVENESS_WIRE_MODE},
},
Expand Down Expand Up @@ -83,7 +83,7 @@ pub struct PeerListener<TTransport> {
node_identity: Arc<NodeIdentity>,
our_supported_protocols: Vec<ProtocolId>,
liveness_session_count: Arc<AtomicUsize>,
on_listening: OneshotTrigger<Result<(Multiaddr, watch::Receiver<LivenessStatus>), ConnectionManagerError>>,
on_listening: OneshotTrigger<Result<Multiaddr, ConnectionManagerError>>,
}

impl<TTransport> PeerListener<TTransport>
Expand Down Expand Up @@ -121,10 +121,7 @@ where
/// in binding the listener socket
// This returns an impl Future and is not async because we want to exclude &self from the future so that it has a
// 'static lifetime as well as to flatten the oneshot result for ergonomics
pub fn on_listening(
&self,
) -> impl Future<Output = Result<(Multiaddr, watch::Receiver<LivenessStatus>), ConnectionManagerError>> + 'static
{
pub fn on_listening(&self) -> impl Future<Output = Result<Multiaddr, ConnectionManagerError>> + 'static {
let signal = self.on_listening.to_signal();
signal.map(|r| r.ok_or(ConnectionManagerError::ListenerOneshotCancelled)?)
}
Expand All @@ -135,7 +132,7 @@ where
self
}

pub async fn listen(self) -> Result<(Multiaddr, watch::Receiver<LivenessStatus>), ConnectionManagerError> {
pub async fn listen(self) -> Result<Multiaddr, ConnectionManagerError> {
let on_listening = self.on_listening();
runtime::current().spawn(self.run());
on_listening.await
Expand All @@ -148,9 +145,7 @@ where
Ok((mut inbound, address)) => {
info!(target: LOG_TARGET, "Listening for peer connections on '{}'", address);

let liveness_watch = self.spawn_liveness_check();

self.on_listening.broadcast(Ok((address, liveness_watch)));
self.on_listening.broadcast(Ok(address));

loop {
tokio::select! {
Expand Down Expand Up @@ -234,21 +229,6 @@ where
});
}

fn spawn_liveness_check(&self) -> watch::Receiver<LivenessStatus> {
match self.config.liveness_self_check_interval {
Some(interval) => LivenessCheck::spawn(
self.transport.clone(),
self.node_identity.public_address(),
interval,
self.shutdown_signal.clone(),
),
None => {
let (_, rx) = watch::channel(LivenessStatus::Disabled);
rx
},
}
}

async fn spawn_listen_task(&self, mut socket: TTransport::Output, peer_addr: Multiaddr) {
let node_identity = self.node_identity.clone();
let peer_manager = self.peer_manager.clone();
Expand Down
21 changes: 4 additions & 17 deletions comms/core/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tari_shutdown::{Shutdown, ShutdownSignal};
use time::Duration;
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{broadcast, mpsc, oneshot, watch},
sync::{broadcast, mpsc, oneshot},
task,
time,
};
Expand All @@ -43,7 +43,7 @@ use super::{
};
use crate::{
backoff::Backoff,
connection_manager::{liveness::LivenessStatus, metrics, ConnectionDirection, ConnectionId},
connection_manager::{metrics, ConnectionDirection, ConnectionId},
multiplexing::Substream,
noise::NoiseConfig,
peer_manager::{NodeId, NodeIdentity, PeerManagerError},
Expand Down Expand Up @@ -149,7 +149,6 @@ impl Default for ConnectionManagerConfig {
pub struct ListenerInfo {
bind_address: Multiaddr,
aux_bind_address: Option<Multiaddr>,
liveness_watch: watch::Receiver<LivenessStatus>,
}

impl ListenerInfo {
Expand All @@ -163,17 +162,6 @@ impl ListenerInfo {
pub fn auxiliary_bind_address(&self) -> Option<&Multiaddr> {
self.aux_bind_address.as_ref()
}

/// Returns the current liveness status
pub fn liveness_status(&self) -> LivenessStatus {
*self.liveness_watch.borrow()
}

/// Waits for liveness status to change from the last time the value was checked.
pub async fn liveness_status_changed(&mut self) -> Option<LivenessStatus> {
self.liveness_watch.changed().await.ok()?;
Some(*self.liveness_watch.borrow())
}
}

/// The actor responsible for connection management.
Expand Down Expand Up @@ -346,17 +334,16 @@ where
listener.set_supported_protocols(self.protocols.get_supported_protocols());

let mut listener_info = match listener.listen().await {
Ok((bind_address, liveness_watch)) => ListenerInfo {
Ok(bind_address) => ListenerInfo {
bind_address,
aux_bind_address: None,
liveness_watch,
},
Err(err) => return Err(err),
};

if let Some(mut listener) = self.aux_listener.take() {
listener.set_supported_protocols(self.protocols.get_supported_protocols());
let (addr, _) = listener.listen().await?;
let addr = listener.listen().await?;
debug!(target: LOG_TARGET, "Aux TCP listener bound to address {}", addr);
listener_info.aux_bind_address = Some(addr);
}
Expand Down
1 change: 1 addition & 0 deletions comms/core/src/connection_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ mod peer_connection;
pub use peer_connection::{ConnectionId, NegotiatedSubstream, PeerConnection, PeerConnectionRequest};

mod liveness;
pub(crate) use liveness::LivenessCheck;
pub use liveness::LivenessStatus;

mod wire_mode;
Expand Down
6 changes: 3 additions & 3 deletions comms/core/src/connection_manager/tests/listener_dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn listen() -> Result<(), Box<dyn Error>> {
shutdown.to_signal(),
);

let (mut bind_addr, _) = listener.listen().await?;
let mut bind_addr = listener.listen().await?;

unpack_enum!(Protocol::Memory(port) = bind_addr.pop().unwrap());
assert!(port > 0);
Expand Down Expand Up @@ -103,7 +103,7 @@ async fn smoke() {
listener.set_supported_protocols(supported_protocols.clone());

// Get the listening address of the peer
let (address, _) = listener.listen().await.unwrap();
let address = listener.listen().await.unwrap();

let node_identity2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let noise_config2 = NoiseConfig::new(node_identity2.clone());
Expand Down Expand Up @@ -207,7 +207,7 @@ async fn banned() {
listener.set_supported_protocols(supported_protocols.clone());

// Get the listener address of the peer
let (address, _) = listener.listen().await.unwrap();
let address = listener.listen().await.unwrap();

let node_identity2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
// The listener has banned the dialer peer
Expand Down