Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(comms): adds periodic socket-level liveness checks #4819

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion applications/tari_base_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{cmp, str::FromStr, sync::Arc};
use std::{cmp, str::FromStr, sync::Arc, time::Duration};

use log::*;
use tari_app_utilities::{consts, identity_management, identity_management::load_from_json};
Expand Down Expand Up @@ -106,6 +106,12 @@ where B: BlockchainBackend + 'static
.map_err(|e| ExitError::new(ExitCode::ConfigError, e))?;
p2p_config.transport.tor.identity = tor_identity;

// TODO: This should probably be disabled in future and have it optionally set/unset in the config - this check
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add more detail about how this leaks the IP address?

Copy link
Member Author

@sdbondi sdbondi Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, TL;DR is that the "pingpong" text is periodically sent on an unencrypted socket allowing anyone observing the traffic to recognise the sending IP address as almost certainly a tari node. With tor, a tor router can make this observation and over IP an ISP/MITM. In fairness, this observation can also be made by observing the first network byte and probably heuristically.

// does allow MITM/ISP/tor router to connect this node's IP to a destination IP/onion address.
// Specifically, "pingpong" text is periodically sent on an unencrypted socket allowing anyone observing
// the traffic to recognise the sending IP address as almost certainly a tari node.
p2p_config.listener_liveness_check_interval = Some(Duration::from_secs(15));

let mut handles = StackBuilder::new(self.interrupt_signal)
.add_initializer(P2pInitializer::new(
p2p_config.clone(),
Expand Down
5 changes: 3 additions & 2 deletions applications/tari_base_node/src/commands/command/add_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ pub struct ArgsAddPeer {
impl HandleCommand<ArgsAddPeer> for CommandContext {
async fn handle_command(&mut self, args: ArgsAddPeer) -> Result<(), Error> {
let public_key = args.public_key.into();
if self.peer_manager.exists(&public_key).await {
let peer_manager = self.comms.peer_manager();
if peer_manager.exists(&public_key).await {
return Err(anyhow!("Peer with public key '{}' already exists", public_key));
}
let node_id = NodeId::from_public_key(&public_key);
Expand All @@ -57,7 +58,7 @@ impl HandleCommand<ArgsAddPeer> for CommandContext {
vec![],
String::new(),
);
self.peer_manager.add_peer(peer).await?;
peer_manager.add_peer(peer).await?;
println!("Peer with node id '{}'was added to the base node.", node_id);
Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions applications/tari_base_node/src/commands/command/ban_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,14 @@ impl CommandContext {
if self.base_node_identity.node_id() == &node_id {
Err(ArgsError::BanSelf.into())
} else if must_ban {
self.connectivity
self.comms
.connectivity()
.ban_peer_until(node_id.clone(), duration, "UI manual ban".to_string())
.await?;
println!("Peer was banned in base node.");
Ok(())
} else {
self.peer_manager.unban_peer(&node_id).await?;
self.comms.peer_manager().unban_peer(&node_id).await?;
println!("Peer ban was removed from base node.");
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl HandleCommand<Args> for CommandContext {
impl CommandContext {
/// Function to process the dial-peer command
pub async fn dial_peer(&self, dest_node_id: NodeId) -> Result<(), Error> {
let connectivity = self.connectivity.clone();
let connectivity = self.comms.connectivity();
task::spawn(async move {
let start = Instant::now();
println!("☎️ Dialing peer...");
Expand Down
6 changes: 3 additions & 3 deletions applications/tari_base_node/src/commands/command/get_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ enum ArgsError {

impl CommandContext {
pub async fn get_peer(&self, partial: Vec<u8>, original_str: String) -> Result<(), Error> {
let peers = self.peer_manager.find_all_starts_with(&partial).await?;
let peer_manager = self.comms.peer_manager();
let peers = peer_manager.find_all_starts_with(&partial).await?;
let peer = {
if let Some(peer) = peers.into_iter().next() {
peer
} else {
let pk = parse_emoji_id_or_public_key(&original_str).ok_or_else(|| ArgsError::NoPeerMatching {
original_str: original_str.clone(),
})?;
let peer = self
.peer_manager
let peer = peer_manager
.find_by_public_key(&pk)
.await?
.ok_or(ArgsError::NoPeerMatching { original_str })?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ impl CommandContext {
"User Agent",
"Info",
]);
let peer_manager = self.comms.peer_manager();
for conn in conns {
let peer = self
.peer_manager
let peer = peer_manager
.find_by_node_id(conn.peer_node_id())
.await
.expect("Unexpected peer database error")
Expand Down Expand Up @@ -105,7 +105,7 @@ impl CommandContext {
impl CommandContext {
/// Function to process the list-connections command
pub async fn list_connections(&mut self) -> Result<(), Error> {
let conns = self.connectivity.get_active_connections().await?;
let conns = self.comms.connectivity().get_active_connections().await?;
let (mut nodes, mut clients) = conns
.into_iter()
.partition::<Vec<_>, _>(|a| a.peer_features().is_node());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl CommandContext {
_ => false,
})
}
let peers = self.peer_manager.perform_query(query).await?;
let peers = self.comms.peer_manager().perform_query(query).await?;
let num_peers = peers.len();
println!();
let mut table = Table::new();
Expand Down
12 changes: 5 additions & 7 deletions applications/tari_base_node/src/commands/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ use async_trait::async_trait;
use clap::{CommandFactory, FromArgMatches, Parser, Subcommand};
use strum::{EnumVariantNames, VariantNames};
use tari_comms::{
connectivity::ConnectivityRequester,
peer_manager::{Peer, PeerManager, PeerManagerError, PeerQuery},
peer_manager::{Peer, PeerManagerError, PeerQuery},
protocol::rpc::RpcServerHandle,
CommsNode,
NodeIdentity,
};
use tari_comms_dht::{DhtDiscoveryRequester, MetricsCollectorHandle};
Expand Down Expand Up @@ -155,8 +155,7 @@ pub struct CommandContext {
dht_metrics_collector: MetricsCollectorHandle,
rpc_server: RpcServerHandle,
base_node_identity: Arc<NodeIdentity>,
peer_manager: Arc<PeerManager>,
connectivity: ConnectivityRequester,
comms: CommsNode,
liveness: LivenessHandle,
node_service: LocalNodeCommsInterface,
mempool_service: LocalMempoolService,
Expand All @@ -176,8 +175,7 @@ impl CommandContext {
dht_metrics_collector: ctx.base_node_dht().metrics_collector(),
rpc_server: ctx.rpc_server(),
base_node_identity: ctx.base_node_identity(),
peer_manager: ctx.base_node_comms().peer_manager(),
connectivity: ctx.base_node_comms().connectivity(),
comms: ctx.base_node_comms().clone(),
liveness: ctx.liveness(),
node_service: ctx.local_node(),
mempool_service: ctx.local_mempool(),
Expand Down Expand Up @@ -297,7 +295,7 @@ impl HandleCommand<Command> for CommandContext {

impl CommandContext {
async fn fetch_banned_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
let pm = &self.peer_manager;
let pm = self.comms.peer_manager();
let query = PeerQuery::new().select_where(|p| p.is_banned());
pm.perform_query(query).await
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ impl HandleCommand<Args> for CommandContext {
impl CommandContext {
pub async fn reset_offline_peers(&self) -> Result<(), Error> {
let num_updated = self
.peer_manager
.comms
.peer_manager()
.update_each(|mut peer| {
if peer.is_offline() {
peer.set_offline(false);
Expand Down
17 changes: 16 additions & 1 deletion applications/tari_base_node/src/commands/command/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
use clap::Parser;
use tari_app_utilities::consts;
use tari_comms::connection_manager::LivenessStatus;
use tokio::time;

use super::{CommandContext, HandleCommand};
Expand All @@ -47,6 +48,7 @@ impl HandleCommand<Args> for CommandContext {
}

impl CommandContext {
#[allow(clippy::too_many_lines)]
pub async fn status(&mut self, output: StatusLineOutput) -> Result<(), Error> {
let mut full_log = false;
if self.last_time_full.elapsed() > Duration::from_secs(120) {
Expand Down Expand Up @@ -102,7 +104,7 @@ impl CommandContext {
status_line.add_field("Mempool", "query timed out");
};

let conns = self.connectivity.get_active_connections().await?;
let conns = self.comms.connectivity().get_active_connections().await?;
let (num_nodes, num_clients) = conns.iter().fold((0usize, 0usize), |(nodes, clients), conn| {
if conn.peer_features().is_node() {
(nodes + 1, clients)
Expand Down Expand Up @@ -139,6 +141,19 @@ impl CommandContext {
);
}

match self.comms.listening_info().liveness_status() {
LivenessStatus::Disabled => {},
LivenessStatus::Checking => {
status_line.add("⏳️️");
},
LivenessStatus::Unreachable => {
status_line.add("‼️");
},
LivenessStatus::Live(latency) => {
status_line.add(format!("⚡️ {:.2?}", latency));
},
}

let target = "base_node::app::status";
match output {
StatusLineOutput::StdOutAndLog => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ impl HandleCommand<Args> for CommandContext {
impl CommandContext {
pub async fn unban_all_peers(&self) -> Result<(), Error> {
let query = PeerQuery::new().select_where(|p| p.is_banned());
let peers = self.peer_manager.perform_query(query).await?;
let peer_manager = self.comms.peer_manager();
let peers = peer_manager.perform_query(query).await?;
let num_peers = peers.len();
for peer in peers {
if let Err(err) = self.peer_manager.unban_peer(&peer.node_id).await {
if let Err(err) = peer_manager.unban_peer(&peer.node_id).await {
println!("Failed to unban peer: {}", err);
}
}
Expand Down
6 changes: 5 additions & 1 deletion applications/tari_base_node/src/commands/status_line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ impl StatusLine {
Default::default()
}

pub fn add<T: ToString>(&mut self, value: T) -> &mut Self {
self.add_field("", value)
}

pub fn add_field<T: ToString>(&mut self, name: &'static str, value: T) -> &mut Self {
self.fields.push((name, value.to_string()));
self
Expand All @@ -54,7 +58,7 @@ impl Display for StatusLine {
write!(f, "{} ", Local::now().format("%H:%M"))?;
let s = self.fields.iter().map(|(k, v)| format(k, v)).collect::<Vec<_>>();

write!(f, "{}", s.join(", "))
write!(f, "{}", s.join(" "))
}
}

Expand Down
10 changes: 9 additions & 1 deletion base_layer/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::path::{Path, PathBuf};
use std::{
path::{Path, PathBuf},
time::Duration,
};

use serde::{Deserialize, Serialize};
use tari_common::{
configuration::{
serializers,
utils::{deserialize_string_or_struct, serialize_string},
StringList,
},
Expand Down Expand Up @@ -105,6 +109,9 @@ pub struct P2pConfig {
/// Liveness sessions can be used by third party tooling to determine node liveness.
/// A value of 0 will disallow any liveness sessions.
pub listener_liveness_max_sessions: usize,
/// If Some, enables periodic socket-level liveness checks
#[serde(with = "serializers::optional_seconds")]
pub listener_liveness_check_interval: Option<Duration>,
/// CIDR for addresses allowed to enter into liveness check mode on the listener.
pub listener_liveness_allowlist_cidrs: StringList,
/// User agent string for this node
Expand Down Expand Up @@ -137,6 +144,7 @@ impl Default for P2pConfig {
},
allow_test_addresses: false,
listener_liveness_max_sessions: 0,
listener_liveness_check_interval: None,
listener_liveness_allowlist_cidrs: StringList::default(),
user_agent: String::new(),
auxiliary_tcp_listener_address: None,
Expand Down
3 changes: 2 additions & 1 deletion base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ impl ServiceInitializer for P2pInitializer {
minor_version: MINOR_NETWORK_VERSION,
network_byte: self.network.as_byte(),
user_agent: config.user_agent.clone(),
});
})
.set_liveness_check(config.listener_liveness_check_interval);

if config.allow_test_addresses || config.dht.allow_test_addresses {
// The default is false, so ensure that both settings are true in this case
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl Default for WalletConfig {
fn default() -> Self {
let p2p = P2pConfig {
datastore_path: PathBuf::from("peer_db/wallet"),
listener_liveness_check_interval: None,
..Default::default()
};
Self {
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet/tests/contacts_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub fn setup_contacts_service<T: ContactsBackend + 'static>(
user_agent: "tari/test-wallet".to_string(),
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_liveness_check_interval: None,
};
let peer_message_subscription_factory = Arc::new(subscription_factory);
let shutdown = Shutdown::new();
Expand Down
2 changes: 2 additions & 0 deletions base_layer/wallet/tests/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ async fn create_wallet(
auxiliary_tcp_listener_address: None,
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_liveness_check_interval: None,
};

let sql_database_path = comms_config
Expand Down Expand Up @@ -679,6 +680,7 @@ async fn test_import_utxo() {
auxiliary_tcp_listener_address: None,
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_liveness_check_interval: None,
};
let config = WalletConfig {
p2p: comms_config,
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3919,6 +3919,7 @@ pub unsafe extern "C" fn comms_config_create(
user_agent: format!("tari/mobile_wallet/{}", env!("CARGO_PKG_VERSION")),
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_liveness_check_interval: None,
};

Box::into_raw(Box::new(config))
Expand Down
2 changes: 2 additions & 0 deletions common/config/presets/c_base_node.toml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ track_reorgs = true

# CIDR for addresses allowed to enter into liveness check mode on the listener.
#listener_liveness_allowlist_cidrs = []
# Enables periodic socket-level liveness checks. Default: Disabled
listener_liveness_check_interval = 15

# User agent string for this node
#user_agent = ""
Expand Down
2 changes: 2 additions & 0 deletions common/config/presets/d_console_wallet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ event_channel_size = 3500

# CIDR for addresses allowed to enter into liveness check mode on the listener.
#listener_liveness_allowlist_cidrs = []
# Enables periodic socket-level liveness checks. Default: Disabled
# listener_liveness_check_interval = 15

# User agent string for this node
#user_agent = ""
Expand Down
13 changes: 12 additions & 1 deletion comms/core/src/builder/comms_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{iter, sync::Arc};
use std::{iter, sync::Arc, time::Duration};

use log::*;
use tari_shutdown::ShutdownSignal;
Expand Down Expand Up @@ -125,6 +125,12 @@ impl UnspawnedCommsNode {
self
}

/// Set to true to enable self liveness checking for the configured public address
pub fn set_liveness_check(mut self, interval: Option<Duration>) -> Self {
self.builder = self.builder.set_liveness_check(interval);
self
}

/// Spawn a new node using the specified [Transport](crate::transports::Transport).
pub async fn spawn_with_transport<TTransport>(self, transport: TTransport) -> Result<CommsNode, CommsBuilderError>
where
Expand Down Expand Up @@ -317,6 +323,11 @@ impl CommsNode {
self.listening_info.bind_address()
}

/// Return [ListenerInfo]
pub fn listening_info(&self) -> &ListenerInfo {
&self.listening_info
}

/// Return the Ip/Tcp address that this node is listening on
pub fn hidden_service(&self) -> Option<&tor::HiddenService> {
self.hidden_service.as_ref()
Expand Down
6 changes: 6 additions & 0 deletions comms/core/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ impl CommsBuilder {
self
}

/// Enable and set interval for self-liveness checks, or None to disable it (default)
pub fn set_liveness_check(mut self, check_interval: Option<Duration>) -> Self {
self.connection_manager_config.liveness_self_check_interval = check_interval;
self
}

fn make_peer_manager(&mut self) -> Result<Arc<PeerManager>, CommsBuilderError> {
let file_lock = self.peer_storage_file_lock.take();

Expand Down
Loading