diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index ab5ac039bf..ce097096f8 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -20,7 +20,12 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{collections::HashMap, mem, time::Duration}; +use std::{ + cmp::{max, min}, + collections::HashMap, + mem, + time::Duration, +}; use log::*; use tari_comms::{ @@ -34,7 +39,7 @@ use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSync use tokio::{ sync::{mpsc, oneshot, watch}, time, - time::MissedTickBehavior, + time::{timeout, Duration as TokioDuration, MissedTickBehavior}, }; use crate::{ @@ -297,7 +302,10 @@ impl WalletConnectivityService { } else { return; }; + let mut loop_count = 0; + let number_of_seeds = peer_manager.get_state().1.len(); loop { + loop_count += 1; let node_id = if let Some(_time) = peer_manager.time_since_last_connection_attempt() { if peer_manager.get_current_peer().node_id == peer_manager.get_next_peer().node_id { // If we only have one peer in the list, wait a bit before retrying @@ -321,7 +329,10 @@ impl WalletConnectivityService { peer_manager.time_since_last_connection_attempt() ); self.pools.remove(&node_id); - match self.try_setup_rpc_pool(node_id.clone()).await { + match self + .try_setup_rpc_pool(node_id.clone(), loop_count / number_of_seeds + 1) + .await + { Ok(true) => { if self.peer_list_change_detected(&peer_manager) { debug!( @@ -391,13 +402,29 @@ impl WalletConnectivityService { self.online_status_watch.send(status); } - async fn try_setup_rpc_pool(&mut self, peer_node_id: NodeId) -> Result { - let conn = match self.try_dial_peer(peer_node_id.clone()).await? { - Some(c) => c, - None => { + async fn try_setup_rpc_pool( + &mut self, + peer_node_id: NodeId, + dial_cycle: usize, + ) -> Result { + // dial_timeout: 1 = 1s, 2 = 10s, 3 = 20s, 4 = 30s, 5 = 40s, 6 = 50s, 7 = 60s, 8 = 70s, 9 = 80s, 10 = 90s + let dial_timeout = TokioDuration::from_secs(min((max(1, 10 * (dial_cycle.saturating_sub(1)))) as u64, 90)); + trace!(target: LOG_TARGET, "Attempt dial with client timeout {:?}", dial_timeout); + let conn = match timeout(dial_timeout, self.try_dial_peer(peer_node_id.clone())).await { + Ok(Ok(Some(c))) => c, + Ok(Ok(None)) => { warn!(target: LOG_TARGET, "Could not dial base node peer '{}'", peer_node_id); return Ok(false); }, + Ok(Err(e)) => return Err(e), + Err(_) => { + return Err(WalletConnectivityError::ConnectivityError( + ConnectivityError::ClientCancelled(format!( + "Could not connect to '{}' in {:?}", + peer_node_id, dial_timeout + )), + )); + }, }; debug!( target: LOG_TARGET, diff --git a/comms/core/src/connectivity/error.rs b/comms/core/src/connectivity/error.rs index acb554ac79..5dbfe2849d 100644 --- a/comms/core/src/connectivity/error.rs +++ b/comms/core/src/connectivity/error.rs @@ -43,6 +43,8 @@ pub enum ConnectivityError { OnlineWaitTimeout(usize), #[error("Pending dial was cancelled")] DialCancelled, + #[error("Client cancelled: '{0}'")] + ClientCancelled(String), } impl From for ConnectivityError {