Skip to content

Commit

Permalink
feat: allow multiple initial sync peers (#5890)
Browse files Browse the repository at this point in the history
Description
---
Allow syncing from multiple peers initially

Motivation and Context
---
See issue: #5852 
This allows a node to wait until it gets 5 ChainMetaData during initial
sync until it goes to header sync.

How Has This Been Tested?
---
Unit tests


Fixes: #5852
  • Loading branch information
SWvheerden authored Nov 1, 2023
1 parent a3d7cf7 commit e1c504a
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use crate::{
};

const LOG_TARGET: &str = "c::bn::state_machine_service::states::listening";
const INITIAL_SYNC_PEER_COUNT: usize = 5;

/// This struct contains the info of the peer, and is used to serialised and deserialised.
#[derive(Serialize, Deserialize)]
Expand Down Expand Up @@ -117,6 +118,8 @@ impl Listening {
info!(target: LOG_TARGET, "Listening for chain metadata updates");
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(self.is_synced)));
let mut time_since_better_block = None;
let mut initial_sync_counter = 0;
let mut initial_sync_peer_list = Vec::new();
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
loop {
Expand Down Expand Up @@ -177,7 +180,7 @@ impl Listening {
};
log_mdc::extend(mdc.clone());

let sync_mode = determine_sync_mode(
let mut sync_mode = determine_sync_mode(
shared.config.blocks_behind_before_considered_lagging,
&local_metadata,
peer_metadata,
Expand All @@ -203,11 +206,11 @@ impl Listening {
.map(|t| t.elapsed() > shared.config.time_before_considered_lagging)
.unwrap()
{
return StateEvent::FallenBehind(SyncStatus::Lagging {
sync_mode = SyncStatus::Lagging {
local: local.clone(),
network: network.clone(),
sync_peers: sync_peers.clone(),
});
};
}
} else {
// We might have gotten up to date via propagation outside of this state, so reset the timer
Expand All @@ -216,15 +219,56 @@ impl Listening {
}
}

if sync_mode.is_lagging() {
return StateEvent::FallenBehind(sync_mode);
}

if !self.is_synced && sync_mode.is_up_to_date() {
self.is_synced = true;
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(true)));
debug!(target: LOG_TARGET, "Initial sync achieved");
}

// If we have already reached initial sync before, as indicated by the `is_synced` flagged we can
// immediately return fallen behind with the peer that has a higher pow than us
if sync_mode.is_lagging() && self.is_synced {
return StateEvent::FallenBehind(sync_mode);
}
// if we are lagging and not yet reached initial sync, we delay a bit till we get
// INITIAL_SYNC_PEER_COUNT metadata updates from peers to ensure we make a better choice of which
// peer to sync from in the next stages
if let SyncStatus::Lagging {
local,
network,
sync_peers,
} = sync_mode
{
initial_sync_counter += 1;
for peer in sync_peers {
let mut found = false;
// lets search the list list to ensure we only have unique peers in the list with the latest
// up-to-date information
for initial_peer in &mut initial_sync_peer_list {
// we compare the two peers via the comparison operator on syncpeer
if *initial_peer == peer {
found = true;
// if the peer is already in the list, we replace all the information about the peer
// with the newest up-to-date information
*initial_peer = peer.clone();
break;
}
}
if !found {
initial_sync_peer_list.push(peer.clone());
}
}
// We use a list here to ensure that we dont wait for even for INITIAL_SYNC_PEER_COUNT different
// peers
if initial_sync_counter >= INITIAL_SYNC_PEER_COUNT {
// lets return now that we have enough peers to chose from
return StateEvent::FallenBehind(SyncStatus::Lagging {
local,
network,
sync_peers: initial_sync_peer_list,
});
}
}
},
Err(broadcast::error::RecvError::Lagged(n)) => {
debug!(target: LOG_TARGET, "Metadata event subscriber lagged by {} item(s)", n);
Expand Down Expand Up @@ -267,8 +311,10 @@ impl From<BlockSync> for Listening {
}

impl From<DecideNextSync> for Listening {
fn from(_: DecideNextSync) -> Self {
Self { is_synced: false }
fn from(sync: DecideNextSync) -> Self {
Self {
is_synced: sync.is_synced(),
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::cmp::Ordering;

use log::*;

use crate::{
Expand All @@ -40,9 +38,14 @@ const LOG_TARGET: &str = "c::bn::state_machine_service::states::sync_decide";
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DecideNextSync {
sync_peers: Vec<SyncPeer>,
is_synced: bool,
}

impl DecideNextSync {
pub fn is_synced(&self) -> bool {
self.is_synced
}

pub async fn next_event<B: BlockchainBackend + 'static>(&mut self, shared: &BaseNodeStateMachine<B>) -> StateEvent {
use StateEvent::{Continue, FatalError, ProceedToBlockSync, ProceedToHorizonSync};
let local_metadata = match shared.db.get_chain_metadata().await {
Expand Down Expand Up @@ -121,55 +124,9 @@ impl DecideNextSync {

impl From<HeaderSyncState> for DecideNextSync {
fn from(sync: HeaderSyncState) -> Self {
sync.into_sync_peers().into()
}
}

impl From<Vec<SyncPeer>> for DecideNextSync {
fn from(mut sync_peers: Vec<SyncPeer>) -> Self {
sync_peers.sort_by(|a, b| match (a.latency(), b.latency()) {
(None, None) => Ordering::Equal,
// No latency goes to the end
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
(Some(la), Some(lb)) => la.cmp(&lb),
});
Self { sync_peers }
}
}

#[cfg(test)]
mod test {
use std::time::Duration;

use rand::{rngs::OsRng, seq::SliceRandom};
use tari_common_types::chain_metadata::ChainMetadata;

use super::*;

mod sort_by_latency {
use super::*;
use crate::base_node::chain_metadata_service::PeerChainMetadata;

#[test]
fn it_sorts_by_latency() {
let peers = (0..10)
.map(|i| {
PeerChainMetadata::new(
Default::default(),
ChainMetadata::empty(),
Some(Duration::from_millis(i)),
)
.into()
})
.chain(Some(
PeerChainMetadata::new(Default::default(), ChainMetadata::empty(), None).into(),
))
.collect::<Vec<SyncPeer>>();
let mut shuffled = peers.clone();
shuffled.shuffle(&mut OsRng);
let decide = DecideNextSync::from(shuffled);
assert_eq!(decide.sync_peers, peers);
}
let is_synced = sync.is_synced();
let mut sync_peers = sync.into_sync_peers();
sync_peers.sort();
DecideNextSync { sync_peers, is_synced }
}
}
65 changes: 65 additions & 0 deletions base_layer/core/src/base_node/sync/sync_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
cmp::Ordering,
fmt::{Display, Formatter},
time::Duration,
};
Expand Down Expand Up @@ -97,3 +98,67 @@ impl PartialEq for SyncPeer {
}
}
impl Eq for SyncPeer {}

impl Ord for SyncPeer {
fn cmp(&self, other: &Self) -> Ordering {
let mut result = self
.peer_metadata
.claimed_chain_metadata()
.accumulated_difficulty()
.cmp(&other.peer_metadata.claimed_chain_metadata().accumulated_difficulty());
if result == Ordering::Equal {
match (self.latency(), other.latency()) {
(None, None) => result = Ordering::Equal,
// No latency goes to the end
(Some(_), None) => result = Ordering::Less,
(None, Some(_)) => result = Ordering::Greater,
(Some(la), Some(lb)) => result = la.cmp(&lb),
}
}
result
}
}

impl PartialOrd for SyncPeer {
fn partial_cmp(&self, other: &SyncPeer) -> Option<Ordering> {
Some(self.cmp(other))
}
}

#[cfg(test)]
mod test {
use std::time::Duration;

use rand::{rngs::OsRng, seq::SliceRandom};
use tari_common_types::chain_metadata::ChainMetadata;

use super::*;

mod sort_by_latency {
use tari_comms::types::{CommsPublicKey, CommsSecretKey};
use tari_crypto::keys::{PublicKey, SecretKey};

use super::*;
use crate::base_node::chain_metadata_service::PeerChainMetadata;

#[test]
fn it_sorts_by_latency() {
let peers = (0..10)
.map(|i| {
let sk = CommsSecretKey::random(&mut OsRng);
let pk = CommsPublicKey::from_secret_key(&sk);
let node_id = NodeId::from_key(&pk);
PeerChainMetadata::new(node_id, ChainMetadata::empty(), Some(Duration::from_millis(i))).into()
})
.chain(Some(
PeerChainMetadata::new(Default::default(), ChainMetadata::empty(), None).into(),
))
.collect::<Vec<SyncPeer>>();
let mut shuffled = peers.clone();
shuffled.shuffle(&mut OsRng);
assert_ne!(shuffled, peers);
shuffled.sort();
assert_eq!(shuffled, peers);
}
}
}
Loading

0 comments on commit e1c504a

Please sign in to comment.