Skip to content

Commit

Permalink
Merging dev #6655
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Nov 27, 2024
1 parent b0d71de commit c090160
Show file tree
Hide file tree
Showing 12 changed files with 502 additions and 36 deletions.
8 changes: 7 additions & 1 deletion applications/minotari_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ use tari_p2p::{
initialization::P2pInitializer,
message::TariNodeMessageSpec,
peer_seeds::SeedPeer,
services::liveness::{config::LivenessConfig, LivenessInitializer},
services::{
liveness::{config::LivenessConfig, LivenessInitializer},
monitor_peers::MonitorPeersInitializer,
},
Dispatcher,
P2pConfig,
};
Expand Down Expand Up @@ -142,6 +145,9 @@ where B: BlockchainBackend + 'static
},
dispatcher.clone(),
))
.add_initializer(MonitorPeersInitializer::new(
base_node_config.metadata_auto_ping_interval,
))
.add_initializer(ChainMetadataServiceInitializer)
.add_initializer(BaseNodeStateMachineInitializer::new(
self.db.clone().into(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ mod test {
metadata,
peer_id,
latency: None,
nonce: 0,
};

let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
Expand All @@ -387,6 +388,7 @@ mod test {
metadata,
peer_id: node_id,
latency: None,
nonce: 0,
};

let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
Expand All @@ -405,6 +407,7 @@ mod test {
metadata,
peer_id: node_id,
latency: None,
nonce: 0,
};

let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
Expand Down
27 changes: 20 additions & 7 deletions base_layer/p2p/src/services/liveness/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use crate::proto::liveness::MetadataKey;
pub enum LivenessRequest {
/// Send a ping to the given node ID
SendPing(PeerId),
/// Ping a list of peers
SendPings(Vec<PeerId>),
/// Retrieve the total number of pings received
GetPingCount,
/// Retrieve the total number of pongs received
Expand All @@ -55,7 +57,7 @@ pub enum LivenessRequest {
#[derive(Debug)]
pub enum LivenessResponse {
/// Indicates that the request succeeded
Ok,
Ok(Option<Vec<u64>>),
/// Used to return a counter value from `GetPingCount` and `GetPongCount`
Count(usize),
/// Response for GetAvgLatency and GetNetworkAvgLatency
Expand All @@ -82,14 +84,17 @@ pub struct PingPongEvent {
pub latency: Option<Duration>,
/// Metadata of the corresponding node
pub metadata: Metadata,
/// The nonce of the ping/pong message, for clients that want to match pings with pongs
pub nonce: u64,
}

impl PingPongEvent {
pub fn new(peer_id: PeerId, latency: Option<Duration>, metadata: Metadata) -> Self {
pub fn new(peer_id: PeerId, latency: Option<Duration>, metadata: Metadata, nonce: u64) -> Self {
Self {
peer_id,
latency,
metadata,
nonce,
}
}
}
Expand Down Expand Up @@ -120,9 +125,17 @@ impl LivenessHandle {
}

/// Send a ping to a given node ID
pub async fn send_ping(&mut self, peer_id: PeerId) -> Result<(), LivenessError> {
pub async fn send_ping(&mut self, peer_id: PeerId) -> Result<u64, LivenessError> {
match self.handle.call(LivenessRequest::SendPing(peer_id)).await?? {
LivenessResponse::Ok => Ok(()),
LivenessResponse::Ok(Some(nonces)) => Ok(nonces[0]),
_ => Err(LivenessError::UnexpectedApiResponse),
}
}

/// Send pings to a list of peers
pub async fn send_pings(&mut self, node_ids: Vec<PeerId>) -> Result<Vec<u64>, LivenessError> {
match self.handle.call(LivenessRequest::SendPings(node_ids)).await?? {
LivenessResponse::Ok(Some(nonces)) => Ok(nonces),
_ => Err(LivenessError::UnexpectedApiResponse),
}
}
Expand Down Expand Up @@ -150,15 +163,15 @@ impl LivenessHandle {
.call(LivenessRequest::SetMetadataEntry(key, value))
.await??
{
LivenessResponse::Ok => Ok(()),
LivenessResponse::Ok(_) => Ok(()),
_ => Err(LivenessError::UnexpectedApiResponse),
}
}

/// Add a monitored peer to the basic config if not present
pub async fn check_add_monitored_peer(&mut self, peer_id: PeerId) -> Result<(), LivenessError> {
match self.handle.call(LivenessRequest::AddMonitoredPeer(peer_id)).await?? {
LivenessResponse::Ok => Ok(()),
LivenessResponse::Ok(_) => Ok(()),
_ => Err(LivenessError::UnexpectedApiResponse),
}
}
Expand All @@ -170,7 +183,7 @@ impl LivenessHandle {
.call(LivenessRequest::RemoveMonitoredPeer(peer_id))
.await??
{
LivenessResponse::Ok => Ok(()),
LivenessResponse::Ok(_) => Ok(()),
_ => Err(LivenessError::UnexpectedApiResponse),
}
}
Expand Down
12 changes: 8 additions & 4 deletions base_layer/p2p/src/services/liveness/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ impl LivenessMock {
self.mock_state.add_request_call(req.clone());
match req {
SendPing(_) => {
reply.send(Ok(LivenessResponse::Ok)).unwrap();
reply.send(Ok(LivenessResponse::Ok(Some(vec![0])))).unwrap();
},
SendPings(node_ids) => {
let nonces: Vec<u64> = (0..node_ids.len() as u64).collect();
reply.send(Ok(LivenessResponse::Ok(Some(nonces)))).unwrap();
},
GetPingCount => {
reply.send(Ok(LivenessResponse::Count(1))).unwrap();
Expand All @@ -140,13 +144,13 @@ impl LivenessMock {
reply.send(Ok(LivenessResponse::AvgLatency(None))).unwrap();
},
SetMetadataEntry(_, _) => {
reply.send(Ok(LivenessResponse::Ok)).unwrap();
reply.send(Ok(LivenessResponse::Ok(None))).unwrap();
},
AddMonitoredPeer(_) => {
reply.send(Ok(LivenessResponse::Ok)).unwrap();
reply.send(Ok(LivenessResponse::Ok(None))).unwrap();
},
RemoveMonitoredPeer(_) => {
reply.send(Ok(LivenessResponse::Ok)).unwrap();
reply.send(Ok(LivenessResponse::Ok(None))).unwrap();
},
}
}
Expand Down
4 changes: 3 additions & 1 deletion base_layer/p2p/src/services/liveness/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub use handle::{LivenessEvent, LivenessHandle, LivenessRequest, LivenessRespons

mod message;
mod service;
pub use service::MAX_INFLIGHT_TTL;

mod state;
pub use state::Metadata;
Expand All @@ -64,6 +65,7 @@ use tari_service_framework::{
use tokio::sync::{broadcast, mpsc};

use self::service::LivenessService;
pub use crate::proto::liveness::MetadataKey;
use crate::{
message::TariNodeMessageSpec,
proto::message::TariMessageType,
Expand All @@ -74,7 +76,7 @@ const LOG_TARGET: &str = "p2p::services::liveness";

/// Initializer for the Liveness service handle and service future.
pub struct LivenessInitializer {
config: Option<LivenessConfig>,
pub(crate) config: Option<LivenessConfig>,
dispatcher: Dispatcher,
}

Expand Down
54 changes: 42 additions & 12 deletions base_layer/p2p/src/services/liveness/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{collections::HashSet, iter, sync::Arc, time::Instant};
use std::{
collections::HashSet,
iter,
sync::Arc,
time::{Duration, Instant},
};

use futures::{future::Either, pin_mut, stream::StreamExt, Stream};
use log::*;
Expand All @@ -45,6 +50,8 @@ use crate::{
services::liveness::{handle::LivenessEventSender, LivenessEvent, PingPongEvent},
};

pub const MAX_INFLIGHT_TTL: Duration = Duration::from_secs(30);

/// Service responsible for testing Liveness of Peers.
pub struct LivenessService<THandleStream> {
config: LivenessConfig,
Expand Down Expand Up @@ -163,7 +170,8 @@ where TRequestStream: Stream<Item = RequestContext<LivenessRequest, Result<Liven
message_tag,
);

let ping_event = PingPongEvent::new(source_peer_id, None, ping_pong_msg.metadata.into());
let ping_event =
PingPongEvent::new(source_peer_id, None, ping_pong_msg.metadata.into(), ping_pong_msg.nonce);
self.publish_event(LivenessEvent::ReceivedPing(Box::new(ping_event)));
},
PingPong::Pong => {
Expand All @@ -188,19 +196,29 @@ where TRequestStream: Stream<Item = RequestContext<LivenessRequest, Result<Liven
message_tag,
);

let pong_event = PingPongEvent::new(source_peer_id, maybe_latency, ping_pong_msg.metadata.into());
let pong_event = PingPongEvent::new(
source_peer_id,
maybe_latency,
ping_pong_msg.metadata.into(),
ping_pong_msg.nonce,
);
self.publish_event(LivenessEvent::ReceivedPong(Box::new(pong_event)));
},
}
Ok(())
}

async fn send_ping(&mut self, peer_id: PeerId) -> Result<(), LivenessError> {
async fn send_ping(&mut self, peer_id: PeerId) -> Result<u64, LivenessError> {
let msg = PingPongMessage::ping_with_metadata(self.state.metadata().clone());
self.state.add_inflight_ping(msg.nonce, peer_id);
let nonce = msg.nonce;
self.state.add_inflight_ping(
msg.nonce,
peer_id,
self.config.auto_ping_interval.unwrap_or(MAX_INFLIGHT_TTL),
);
debug!(target: LOG_TARGET, "Sending ping to peer '{}'", peer_id);
self.outbound_messaging.send_message(peer_id, msg).await?;
Ok(())
Ok(nonce)
}

async fn send_pong(&mut self, nonce: u64, dest: PeerId) -> Result<(), LivenessError> {
Expand All @@ -214,9 +232,17 @@ where TRequestStream: Stream<Item = RequestContext<LivenessRequest, Result<Liven
use LivenessRequest::*;
match request {
SendPing(peer_id) => {
self.send_ping(peer_id).await?;
let nonce = self.send_ping(peer_id).await?;
self.state.inc_pings_sent();
Ok(LivenessResponse::Ok)
Ok(LivenessResponse::Ok(Some(vec![nonce])))
},
SendPings(peer_ids) => {
let mut nonces = Vec::with_capacity(peer_ids.len());
for peer_id in peer_ids {
nonces.push(self.send_ping(peer_id).await?);
self.state.inc_pings_sent();
}
Ok(LivenessResponse::Ok(Some(nonces)))
},
GetPingCount => {
let ping_count = self.get_ping_count();
Expand All @@ -236,15 +262,15 @@ where TRequestStream: Stream<Item = RequestContext<LivenessRequest, Result<Liven
},
SetMetadataEntry(key, value) => {
self.state.set_metadata_entry(key, value);
Ok(LivenessResponse::Ok)
Ok(LivenessResponse::Ok(None))
},
AddMonitoredPeer(peer_id) => {
self.monitored_peers.insert(peer_id);
Ok(LivenessResponse::Ok)
Ok(LivenessResponse::Ok(None))
},
RemoveMonitoredPeer(peer_id) => {
self.monitored_peers.remove(&peer_id);
Ok(LivenessResponse::Ok)
Ok(LivenessResponse::Ok(None))
},
}
}
Expand All @@ -270,7 +296,11 @@ where TRequestStream: Stream<Item = RequestContext<LivenessRequest, Result<Liven

for peer_id in iter {
let msg = PingPongMessage::ping_with_metadata(self.state.metadata().clone());
self.state.add_inflight_ping(msg.nonce, peer_id);
self.state.add_inflight_ping(
msg.nonce,
peer_id,
self.config.auto_ping_interval.unwrap_or(MAX_INFLIGHT_TTL),
);
self.outbound_messaging.send_message(peer_id, msg).await?;
count += 1;
}
Expand Down
20 changes: 10 additions & 10 deletions base_layer/p2p/src/services/liveness/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use super::LOG_TARGET;
use crate::proto::liveness::MetadataKey;

const LATENCY_SAMPLE_WINDOW_SIZE: usize = 25;
const MAX_INFLIGHT_TTL: Duration = Duration::from_secs(40);

/// Represents metadata in a ping/pong message.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
Expand Down Expand Up @@ -136,17 +135,17 @@ impl LivenessState {
}

/// Adds a ping to the inflight ping list, while noting the current time that a ping was sent.
pub fn add_inflight_ping(&mut self, nonce: u64, peer_id: PeerId) {
pub fn add_inflight_ping(&mut self, nonce: u64, peer_id: PeerId, max_inflight_ttl: Duration) {
self.inflight_pings.insert(nonce, (peer_id, Instant::now()));
self.clear_stale_inflight_pings();
self.clear_stale_inflight_pings(max_inflight_ttl);
}

/// Clears inflight ping requests which have not responded and adds them to failed_ping counter
fn clear_stale_inflight_pings(&mut self) {
fn clear_stale_inflight_pings(&mut self, max_inflight_ttl: Duration) {
let (inflight, expired) = self
.inflight_pings
.drain()
.partition(|(_, (_, time))| time.elapsed() <= MAX_INFLIGHT_TTL);
.partition(|(_, (_, time))| time.elapsed() <= max_inflight_ttl);

self.inflight_pings = inflight;

Expand Down Expand Up @@ -267,6 +266,7 @@ mod test {
use tari_network::test_utils::random_peer_id;

use super::*;
use crate::services::liveness::service::MAX_INFLIGHT_TTL;

#[test]
fn new() {
Expand Down Expand Up @@ -324,7 +324,7 @@ mod test {
let mut state = LivenessState::new();

let peer_id = random_peer_id();
state.add_inflight_ping(123, peer_id);
state.add_inflight_ping(123, peer_id, MAX_INFLIGHT_TTL);

let latency = state.record_pong(123, &peer_id).unwrap();
assert!(latency < Duration::from_millis(50));
Expand All @@ -342,10 +342,10 @@ mod test {
let mut state = LivenessState::new();

let peer1 = random_peer_id();
state.add_inflight_ping(1, peer1);
state.add_inflight_ping(1, peer1, MAX_INFLIGHT_TTL);
let peer2 = random_peer_id();
state.add_inflight_ping(2, peer2);
state.add_inflight_ping(3, peer2);
state.add_inflight_ping(2, peer2, MAX_INFLIGHT_TTL);
state.add_inflight_ping(3, peer2, MAX_INFLIGHT_TTL);

assert!(!state.failed_pings.contains_key(&peer1));
assert!(!state.failed_pings.contains_key(&peer2));
Expand All @@ -356,7 +356,7 @@ mod test {
*time = Instant::now() - (MAX_INFLIGHT_TTL + Duration::from_secs(1));
}

state.clear_stale_inflight_pings();
state.clear_stale_inflight_pings(MAX_INFLIGHT_TTL);
let n = state.failed_pings.get(&peer1).unwrap();
assert_eq!(*n, 1);
let n = state.failed_pings.get(&peer2).unwrap();
Expand Down
1 change: 1 addition & 0 deletions base_layer/p2p/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@

pub mod dispatcher;
pub mod liveness;
pub mod monitor_peers;
// pub mod utils;
Loading

0 comments on commit c090160

Please sign in to comment.