Skip to content

Commit

Permalink
Add contacts liveness service
Browse files Browse the repository at this point in the history
Implemented a liveness service for the base layer wallet's contacts
that wil send ping messages and respond with pong messages to indicate
liveness information.
  • Loading branch information
hansieodendaal committed Feb 21, 2022
1 parent be75c74 commit 4f09ee8
Show file tree
Hide file tree
Showing 26 changed files with 836 additions and 69 deletions.
2 changes: 1 addition & 1 deletion applications/tari_base_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ where B: BlockchainBackend + 'static
.add_initializer(mempool_sync)
.add_initializer(LivenessInitializer::new(
LivenessConfig {
auto_ping_interval: Some(Duration::from_secs(config.auto_ping_interval)),
auto_ping_interval: Some(Duration::from_secs(config.metadata_auto_ping_interval)),
monitored_peers: sync_peers.clone(),
..Default::default()
},
Expand Down
1 change: 1 addition & 0 deletions applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ pub async fn init_wallet(
Some(config.buffer_rate_limit_console_wallet),
Some(updater_config),
config.autoupdate_check_interval,
Some(config.contacts_auto_ping_interval),
);

let mut wallet = Wallet::start(
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl AppState {
},
};

let contact = Contact { alias, public_key };
let contact = Contact::new(alias, public_key, None, None);
inner.wallet.contacts_service.upsert_contact(contact).await?;

inner.refresh_contacts_state().await?;
Expand Down
2 changes: 2 additions & 0 deletions base_layer/p2p/src/proto/liveness.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ enum MetadataKey {
MetadataKeyNone = 0;
// The value for this key contains chain metadata
MetadataKeyChainMetadata = 1;
// The value for this key contains empty data
MetadataKeyContactsLiveness = 2;
}
24 changes: 24 additions & 0 deletions base_layer/p2p/src/services/liveness/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ pub enum LivenessRequest {
GetNetworkAvgLatency,
/// Set the metadata attached to each ping/pong message
SetMetadataEntry(MetadataKey, Vec<u8>),
/// Add a monitored peer to the basic config
AddMonitoredPeer(NodeId),
/// Remove a monitored peer from the basic config
RemoveMonitoredPeer(NodeId),
}

/// Response type for `LivenessService`
Expand Down Expand Up @@ -153,6 +157,26 @@ impl LivenessHandle {
}
}

/// Add a monitored peer to the basic config if not present
pub async fn check_add_monitored_peer(&mut self, node_id: NodeId) -> Result<(), LivenessError> {
match self.handle.call(LivenessRequest::AddMonitoredPeer(node_id)).await?? {
LivenessResponse::Ok => Ok(()),
_ => Err(LivenessError::UnexpectedApiResponse),
}
}

/// Remove a monitored peer from the basic config if present
pub async fn check_remove_monitored_peer(&mut self, node_id: NodeId) -> Result<(), LivenessError> {
match self
.handle
.call(LivenessRequest::RemoveMonitoredPeer(node_id))
.await??
{
LivenessResponse::Ok => Ok(()),
_ => Err(LivenessError::UnexpectedApiResponse),
}
}

/// Retrieve the average latency for a given node
pub async fn get_avg_latency(&mut self, node_id: NodeId) -> Result<Option<Duration>, LivenessError> {
match self.handle.call(LivenessRequest::GetAvgLatency(node_id)).await?? {
Expand Down
6 changes: 6 additions & 0 deletions base_layer/p2p/src/services/liveness/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ impl LivenessMock {
SetMetadataEntry(_, _) => {
reply.send(Ok(LivenessResponse::Ok)).unwrap();
},
AddMonitoredPeer(_) => {
reply.send(Ok(LivenessResponse::Ok)).unwrap();
},
RemoveMonitoredPeer(_) => {
reply.send(Ok(LivenessResponse::Ok)).unwrap();
},
}
}
}
23 changes: 20 additions & 3 deletions base_layer/p2p/src/services/liveness/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tari_comms_dht::{
};
use tari_service_framework::reply_channel::RequestContext;
use tari_shutdown::ShutdownSignal;
use tokio::{time, time::MissedTickBehavior};
use tokio::{sync::RwLock, time, time::MissedTickBehavior};
use tokio_stream::wrappers;

use super::{
Expand Down Expand Up @@ -63,6 +63,7 @@ pub struct LivenessService<THandleStream, TPingStream> {
outbound_messaging: OutboundMessageRequester,
event_publisher: LivenessEventSender,
shutdown_signal: ShutdownSignal,
monitored_peers: Arc<RwLock<Vec<NodeId>>>,
}

impl<TRequestStream, TPingStream> LivenessService<TRequestStream, TPingStream>
Expand All @@ -88,7 +89,8 @@ where
outbound_messaging,
event_publisher,
shutdown_signal,
config,
config: config.clone(),
monitored_peers: Arc::new(RwLock::new(config.monitored_peers)),
}
}

Expand Down Expand Up @@ -254,10 +256,25 @@ where
self.state.set_metadata_entry(key, value);
Ok(LivenessResponse::Ok)
},
AddMonitoredPeer(node_id) => {
let node_id_exists = { self.monitored_peers.read().await.iter().any(|val| val == &node_id) };
if !node_id_exists {
self.monitored_peers.write().await.push(node_id.clone());
}
Ok(LivenessResponse::Ok)
},
RemoveMonitoredPeer(node_id) => {
let node_id_exists = { self.monitored_peers.read().await.iter().position(|val| *val == node_id) };
if let Some(pos) = node_id_exists {
self.monitored_peers.write().await.swap_remove(pos);
}
Ok(LivenessResponse::Ok)
},
}
}

async fn start_ping_round(&mut self) -> Result<(), LivenessError> {
let monitored_peers = { self.monitored_peers.read().await.clone() };
let selected_peers = self
.connectivity
.select_connections(ConnectivitySelection::random_nodes(
Expand All @@ -267,7 +284,7 @@ where
.await?
.into_iter()
.map(|c| c.peer_node_id().clone())
.chain(self.config.monitored_peers.clone())
.chain(monitored_peers)
.collect::<Vec<_>>();

if selected_peers.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tempfile = "3.1.0"
thiserror = "1.0.26"
tokio = { version = "1.11", features = ["sync", "macros"] }
tower = "0.4"
prost = "0.9"

[dependencies.tari_core]
path = "../../base_layer/core"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- This migration is only meant for fresh databases on a testnet reset, so the down is not needed
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
-- Copyright 2021. The Tari Project
--
-- Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
-- following conditions are met:
--
-- 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
-- disclaimer.
--
-- 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
-- following disclaimer in the documentation and/or other materials provided with the distribution.
--
-- 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
-- products derived from this software without specific prior written permission.
--
-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
-- INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-- DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-- SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-- SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
-- WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
-- USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

PRAGMA foreign_keys=OFF;
DROP TABLE contacts;
CREATE TABLE contacts (
public_key BLOB PRIMARY KEY NOT NULL UNIQUE,
node_id BLOB NOT NULL UNIQUE,
alias TEXT NOT NULL,
last_seen DATETIME,
latency INTEGER
);
PRAGMA foreign_keys=ON;
3 changes: 3 additions & 0 deletions base_layer/wallet/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct WalletConfig {
pub base_node_service_config: BaseNodeServiceConfig,
pub updater_config: Option<AutoUpdateConfig>,
pub autoupdate_check_interval: Option<Duration>,
pub contacts_auto_ping_interval: u64,
}

impl WalletConfig {
Expand All @@ -59,6 +60,7 @@ impl WalletConfig {
rate_limit: Option<usize>,
updater_config: Option<AutoUpdateConfig>,
autoupdate_check_interval: Option<Duration>,
contacts_auto_ping_interval: Option<u64>,
) -> Self {
Self {
comms_config,
Expand All @@ -71,6 +73,7 @@ impl WalletConfig {
base_node_service_config: base_node_service_config.unwrap_or_default(),
updater_config,
autoupdate_check_interval,
contacts_auto_ping_interval: contacts_auto_ping_interval.unwrap_or(90),
}
}
}
2 changes: 0 additions & 2 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,8 @@ impl WalletConnectivityService {
}

async fn check_connection(&mut self) {
debug!(target: LOG_TARGET, "HERE1");
match self.pools.as_ref() {
Some(pool) => {
debug!(target: LOG_TARGET, "HERE2");
if !pool.base_node_wallet_rpc_client.is_connected().await {
debug!(target: LOG_TARGET, "Peer connection lost. Attempting to reconnect...");
self.set_online_status(OnlineStatus::Offline);
Expand Down
3 changes: 3 additions & 0 deletions base_layer/wallet/src/contacts_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use diesel::result::Error as DieselError;
use tari_p2p::services::liveness::error::LivenessError;
use tari_service_framework::reply_channel::TransportChannelError;
use thiserror::Error;

Expand All @@ -37,6 +38,8 @@ pub enum ContactsServiceError {
ContactsServiceStorageError(#[from] ContactsServiceStorageError),
#[error("Transport channel error: `{0}`")]
TransportChannelError(#[from] TransportChannelError),
#[error("Livenessl error: `{0}`")]
LivenessError(#[from] LivenessError),
}

#[derive(Debug, Error)]
Expand Down
Loading

0 comments on commit 4f09ee8

Please sign in to comment.