Skip to content

Commit

Permalink
feat: prevent banning of base node in wallet
Browse files Browse the repository at this point in the history
This PR excludes the connected base node in the wallet from being banned.
  • Loading branch information
StriderDM committed Dec 3, 2021
1 parent 7d49fa4 commit 46a9b37
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 14 deletions.
4 changes: 4 additions & 0 deletions base_layer/wallet/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ where

self.comms.peer_manager().add_peer(peer.clone()).await?;
self.wallet_connectivity.set_base_node(peer);
self.comms
.connectivity()
.message_rate_immune_peer(NodeId::from_key(&public_key))
.await?;

Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions common/src/configuration/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub fn default_config(bootstrap: &ConfigBootstrap) -> Config {
cfg.set_default("base_node.weatherwax.pruning_horizon", 0).unwrap();
cfg.set_default("base_node.weatherwax.pruned_mode_cleanup_interval", 50)
.unwrap();
cfg.set_default("base_node.weatherwax.flood_ban_max_msg_count", 1000)
cfg.set_default("base_node.weatherwax.flood_ban_max_msg_count", 10000)
.unwrap();
cfg.set_default("base_node.weatherwax.peer_seeds", Vec::<String>::new())
.unwrap();
Expand Down Expand Up @@ -215,7 +215,8 @@ pub fn default_config(bootstrap: &ConfigBootstrap) -> Config {
cfg.set_default("base_node.igor.pruning_horizon", 0).unwrap();
cfg.set_default("base_node.igor.pruned_mode_cleanup_interval", 50)
.unwrap();
cfg.set_default("base_node.igor.flood_ban_max_msg_count", 1000).unwrap();
cfg.set_default("base_node.igor.flood_ban_max_msg_count", 10000)
.unwrap();
cfg.set_default("base_node.igor.grpc_enabled", false).unwrap();
cfg.set_default("base_node.igor.grpc_base_node_address", "127.0.0.1:18142")
.unwrap();
Expand Down
42 changes: 30 additions & 12 deletions comms/dht/src/connectivity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ pub use metrics::{MetricsCollector, MetricsCollectorHandle};

use crate::{connectivity::metrics::MetricsError, event::DhtEvent, DhtActorError, DhtConfig, DhtRequester};
use log::*;
use std::{sync::Arc, time::Instant};
use std::{
sync::{Arc, RwLock},
time::Instant,
};
use tari_comms::{
connectivity::{ConnectivityError, ConnectivityEvent, ConnectivityEventRx, ConnectivityRequester},
peer_manager::{node_id::NodeDistance, NodeId, PeerManagerError, PeerQuery, PeerQuerySortBy},
Expand Down Expand Up @@ -67,6 +70,7 @@ pub struct DhtConnectivity {
config: DhtConfig,
peer_manager: Arc<PeerManager>,
node_identity: Arc<NodeIdentity>,
message_rate_immune_node: Arc<RwLock<Option<NodeId>>>,
connectivity: ConnectivityRequester,
dht_requester: DhtRequester,
/// List of neighbours managed by DhtConnectivity ordered by distance from this node
Expand Down Expand Up @@ -102,6 +106,7 @@ impl DhtConnectivity {
config,
peer_manager,
node_identity,
message_rate_immune_node: Arc::new(RwLock::new(None)),
connectivity,
dht_requester,
metrics_collector,
Expand Down Expand Up @@ -245,17 +250,24 @@ impl DhtConnectivity {
.await?;

for (peer, mps) in nodes {
warn!(
target: LOG_TARGET,
"Banning peer `{}` because of flooding. Message rate: {:.2}m/s", peer, mps
);
self.connectivity
.ban_peer_until(
peer,
self.config.ban_duration,
"Exceeded maximum message rate".to_string(),
)
.await?;
if *self.message_rate_immune_node.read().expect("Read lock should not fail") != Some(peer.clone()) {
warn!(
target: LOG_TARGET,
"Banning peer `{}` because of flooding. Message rate: {:.2}m/s", peer, mps
);
self.connectivity
.ban_peer_until(
peer,
self.config.ban_duration,
"Exceeded maximum message rate".to_string(),
)
.await?;
} else {
warn!(
target: LOG_TARGET,
"Peer `{}` immune from banning. Message rate: {:.2}m/s", peer, mps
);
}
}
Ok(())
}
Expand Down Expand Up @@ -470,6 +482,12 @@ impl DhtConnectivity {
debug!(target: LOG_TARGET, "Node is OFFLINE");
self.refresh_peer_pools().await?;
},
MessageRateImmuneNode(node_id) => {
*self
.message_rate_immune_node
.write()
.expect("Write lock should not fail") = Some(node_id);
},
_ => {},
}

Expand Down
12 changes: 12 additions & 0 deletions comms/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ impl ConnectivityManagerActor {
error!(target: LOG_TARGET, "Error when banning peer: {:?}", err);
}
},
MessageRateImmuneNode(node_id) => {
if let Err(err) = self.message_rate_immune_peer(&node_id).await {
error!(target: LOG_TARGET, "Error when excluding peer from banning: {:?}", err);
}
},
GetActiveConnections(reply) => {
let _ = reply.send(
self.pool
Expand Down Expand Up @@ -748,6 +753,13 @@ impl ConnectivityManagerActor {
}
Ok(())
}

async fn message_rate_immune_peer(&mut self, node_id: &NodeId) -> Result<(), ConnectivityError> {
info!(target: LOG_TARGET, "Excluding peer {} from banning", node_id);
self.publish_event(ConnectivityEvent::MessageRateImmuneNode(node_id.clone()));

Ok(())
}
}

fn delayed_close(conn: PeerConnection, delay: Duration) {
Expand Down
11 changes: 11 additions & 0 deletions comms/src/connectivity/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub enum ConnectivityEvent {
PeerConnected(PeerConnection),
PeerConnectFailed(NodeId),
PeerBanned(NodeId),
MessageRateImmuneNode(NodeId),
PeerOffline(NodeId),
PeerConnectionWillClose(NodeId, ConnectionDirection),

Expand All @@ -71,6 +72,7 @@ impl fmt::Display for ConnectivityEvent {
PeerConnected(node_id) => write!(f, "PeerConnected({})", node_id),
PeerConnectFailed(node_id) => write!(f, "PeerConnectFailed({})", node_id),
PeerBanned(node_id) => write!(f, "PeerBanned({})", node_id),
MessageRateImmuneNode(node_id) => write!(f, "MessageRateImmuneNode({})", node_id),
PeerOffline(node_id) => write!(f, "PeerOffline({})", node_id),
PeerConnectionWillClose(node_id, direction) => {
write!(f, "PeerConnectionWillClose({}, {})", node_id, direction)
Expand Down Expand Up @@ -99,6 +101,7 @@ pub enum ConnectivityRequest {
GetAllConnectionStates(oneshot::Sender<Vec<PeerConnectionState>>),
GetActiveConnections(oneshot::Sender<Vec<PeerConnection>>),
BanPeer(NodeId, Duration, String),
MessageRateImmuneNode(NodeId),
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -243,6 +246,14 @@ impl ConnectivityRequester {
.await
}

pub async fn message_rate_immune_peer(&mut self, node_id: NodeId) -> Result<(), ConnectivityError> {
self.sender
.send(ConnectivityRequest::MessageRateImmuneNode(node_id))
.await
.map_err(|_| ConnectivityError::ActorDisconnected)?;
Ok(())
}

pub async fn wait_started(&mut self) -> Result<(), ConnectivityError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
Expand Down
1 change: 1 addition & 0 deletions comms/src/test_utils/mocks/connectivity_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ impl ConnectivityManagerMock {
},
GetAllConnectionStates(_) => unimplemented!(),
BanPeer(_, _, _) => {},
MessageRateImmuneNode(_) => {},
GetActiveConnections(reply) => {
self.state
.with_state(|state| reply.send(state.active_conns.values().cloned().collect()).unwrap())
Expand Down

0 comments on commit 46a9b37

Please sign in to comment.