Skip to content

Commit

Permalink
fix: add decision step between header sync and pruned/archival (#3546)
Browse files Browse the repository at this point in the history
Description
---

- removes pruned metadata sync logic from listening state
- adds `DecideNextSync` step that is responsible for finding a suitable peer for block/pruned sync
- chain metadata service includes peer latency
- DRY'd up chain metadata service ping/pong event handling
- take latency into account when selecting a sync peer   
- header sync orders the peers in ascending latency order

Motivation and Context
---
When transitioning to header sync, the listening state would filter out peers that have a suitable horizon height.
However any peer regardless of pruned height can provide headers for header sync.

How Has This Been Tested?
---
Sync tests
Manually
  • Loading branch information
sdbondi authored Nov 9, 2021
1 parent 6d4d9d4 commit 23e868a
Show file tree
Hide file tree
Showing 16 changed files with 349 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ use tokio::sync::broadcast;
pub struct PeerChainMetadata {
node_id: NodeId,
chain_metadata: ChainMetadata,
latency: Option<u32>,
}

impl PeerChainMetadata {
pub fn new(node_id: NodeId, chain_metadata: ChainMetadata) -> Self {
pub fn new(node_id: NodeId, chain_metadata: ChainMetadata, latency: Option<u32>) -> Self {
Self {
node_id,
chain_metadata,
latency,
}
}

Expand All @@ -49,6 +51,10 @@ impl PeerChainMetadata {
pub fn claimed_chain_metadata(&self) -> &ChainMetadata {
&self.chain_metadata
}

pub fn latency(&self) -> Option<u32> {
self.latency
}
}

impl Display for PeerChainMetadata {
Expand Down
83 changes: 29 additions & 54 deletions base_layer/core/src/base_node/chain_metadata_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::{
connectivity::{ConnectivityEvent, ConnectivityRequester},
message::MessageExt,
peer_manager::NodeId,
};
use tari_p2p::services::liveness::{LivenessEvent, LivenessHandle, Metadata, MetadataKey};
use tari_p2p::services::liveness::{LivenessEvent, LivenessHandle, MetadataKey, PingPongEvent};
use tokio::sync::broadcast;

const NUM_ROUNDS_NETWORK_SILENCE: u16 = 3;
Expand Down Expand Up @@ -164,8 +163,10 @@ impl ChainMetadataService {
event.node_id
);
self.number_of_rounds_no_pings = 0;
self.collect_chain_state_from_ping(&event.node_id, &event.metadata)?;
self.send_chain_metadata_to_event_publisher().await?;
if event.metadata.has(MetadataKey::ChainMetadata) {
self.collect_chain_state_from_ping_pong(event)?;
self.send_chain_metadata_to_event_publisher().await?;
}
},
// Received a pong, check if our neighbour sent it and it contains ChainMetadata
LivenessEvent::ReceivedPong(event) => {
Expand All @@ -175,8 +176,10 @@ impl ChainMetadataService {
event.node_id
);
self.number_of_rounds_no_pings = 0;
self.collect_chain_state_from_pong(&event.node_id, &event.metadata)?;
self.send_chain_metadata_to_event_publisher().await?;
if event.metadata.has(MetadataKey::ChainMetadata) {
self.collect_chain_state_from_ping_pong(event)?;
self.send_chain_metadata_to_event_publisher().await?;
}
},
// New ping round has begun
LivenessEvent::PingRoundBroadcast(num_peers) => {
Expand Down Expand Up @@ -231,66 +234,35 @@ impl ChainMetadataService {
}
}

fn collect_chain_state_from_ping(
&mut self,
node_id: &NodeId,
metadata: &Metadata,
) -> Result<(), ChainMetadataSyncError> {
if let Some(chain_metadata_bytes) = metadata.get(MetadataKey::ChainMetadata) {
let chain_metadata = proto::ChainMetadata::decode(chain_metadata_bytes.as_slice())?;
let chain_metadata = ChainMetadata::try_from(chain_metadata)
.map_err(|err| ChainMetadataSyncError::ReceivedInvalidChainMetadata(node_id.clone(), err))?;
debug!(
target: LOG_TARGET,
"Received chain metadata from NodeId '{}' #{}, Acc_diff {}",
node_id,
chain_metadata.height_of_longest_chain(),
chain_metadata.accumulated_difficulty().to_formatted_string(&Locale::en),
);

if let Some(pos) = self
.peer_chain_metadata
.iter()
.position(|peer_chainstate| peer_chainstate.node_id() == node_id)
{
self.peer_chain_metadata.remove(pos);
}

self.peer_chain_metadata
.push(PeerChainMetadata::new(node_id.clone(), chain_metadata));
}
Ok(())
}

fn collect_chain_state_from_pong(
&mut self,
node_id: &NodeId,
metadata: &Metadata,
) -> Result<(), ChainMetadataSyncError> {
let chain_metadata_bytes = metadata
fn collect_chain_state_from_ping_pong(&mut self, event: &PingPongEvent) -> Result<(), ChainMetadataSyncError> {
let chain_metadata_bytes = event
.metadata
.get(MetadataKey::ChainMetadata)
.ok_or(ChainMetadataSyncError::NoChainMetadata)?;

let chain_metadata = ChainMetadata::try_from(proto::ChainMetadata::decode(chain_metadata_bytes.as_slice())?)
.map_err(|err| ChainMetadataSyncError::ReceivedInvalidChainMetadata(node_id.clone(), err))?;
.map_err(|err| ChainMetadataSyncError::ReceivedInvalidChainMetadata(event.node_id.clone(), err))?;
debug!(
target: LOG_TARGET,
"Received chain metadata from NodeId '{}' #{}, Acc_diff {}",
node_id,
event.node_id,
chain_metadata.height_of_longest_chain(),
chain_metadata.accumulated_difficulty().to_formatted_string(&Locale::en),
);

if let Some(pos) = self
.peer_chain_metadata
.iter()
.position(|peer_chainstate| peer_chainstate.node_id() == node_id)
.position(|peer_chainstate| *peer_chainstate.node_id() == event.node_id)
{
self.peer_chain_metadata.remove(pos);
}

self.peer_chain_metadata
.push(PeerChainMetadata::new(node_id.clone(), chain_metadata));
self.peer_chain_metadata.push(PeerChainMetadata::new(
event.node_id.clone(),
chain_metadata,
event.latency,
));
Ok(())
}
}
Expand All @@ -301,13 +273,17 @@ mod test {
use crate::base_node::comms_interface::{CommsInterfaceError, NodeCommsRequest, NodeCommsResponse};
use futures::StreamExt;
use std::convert::TryInto;
use tari_comms::test_utils::{
mocks::{create_connectivity_mock, ConnectivityManagerMockState},
node_identity::build_many_node_identities,
use tari_comms::{
peer_manager::NodeId,
test_utils::{
mocks::{create_connectivity_mock, ConnectivityManagerMockState},
node_identity::build_many_node_identities,
},
};
use tari_p2p::services::liveness::{
mock::{create_p2p_liveness_mock, LivenessMockState},
LivenessRequest,
Metadata,
PingPongEvent,
};
use tari_service_framework::reply_channel;
Expand Down Expand Up @@ -465,9 +441,8 @@ mod test {
};

let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
let err = service.handle_liveness_event(&sample_event).await.unwrap_err();
unpack_enum!(ChainMetadataSyncError::NoChainMetadata = err);
assert_eq!(service.peer_chain_metadata.len(), 0);
service.handle_liveness_event(&sample_event).await.unwrap();
assert!(service.peer_chain_metadata.is_empty());
}

#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,22 +138,20 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
use self::{BaseNodeState::*, StateEvent::*, SyncStatus::*};
match (state, event) {
(Starting(s), Initialized) => Listening(s.into()),
(HeaderSync(_), HeadersSynchronized(conn)) => {
if self.config.pruning_horizon > 0 {
HorizonStateSync(states::HorizonStateSync::with_peer(conn))
} else {
BlockSync(states::BlockSync::with_peer(conn))
}
},
(Listening(_), FallenBehind(Lagging(_, sync_peers))) => HeaderSync(sync_peers.into()),
(HeaderSync(s), HeaderSyncFailed) => Waiting(s.into()),
(HeaderSync(s), Continue) => Listening(s.into()),
(HeaderSync(s), NetworkSilence) => Listening(s.into()),
(HeaderSync(s), Continue | NetworkSilence) => Listening(s.into()),
(HeaderSync(s), HeadersSynchronized(_)) => DecideNextSync(s.into()),

(DecideNextSync(_), ProceedToHorizonSync(peer)) => HorizonStateSync(peer.into()),
(DecideNextSync(s), Continue) => Listening(s.into()),
(HorizonStateSync(s), HorizonStateSynchronized) => BlockSync(s.into()),
(HorizonStateSync(s), HorizonStateSyncFailure) => Waiting(s.into()),

(DecideNextSync(_), ProceedToBlockSync(peer)) => BlockSync(peer.into()),
(BlockSync(s), BlocksSynchronized) => Listening(s.into()),
(BlockSync(s), BlockSyncFailed) => Waiting(s.into()),
(Listening(_), FallenBehind(Lagging(_, sync_peers))) => HeaderSync(sync_peers.into()),
(Listening(_), FallenBehind(LaggingBehindHorizon(_, sync_peers))) => HeaderSync(sync_peers.into()),

(Waiting(s), Continue) => Listening(s.into()),
(_, FatalError(s)) => Shutdown(states::Shutdown::with_reason(s)),
(_, UserQuit) => Shutdown(states::Shutdown::with_reason("Shutdown initiated by user".to_string())),
Expand Down Expand Up @@ -240,6 +238,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
match state {
Starting(s) => s.next_event(shared_state).await,
HeaderSync(s) => s.next_event(shared_state).await,
DecideNextSync(s) => s.next_event(shared_state).await,
HorizonStateSync(s) => s.next_event(shared_state).await,
BlockSync(s) => s.next_event(shared_state).await,
Listening(s) => s.next_event(shared_state).await,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,27 @@ use crate::{
base_node::{
comms_interface::BlockEvent,
state_machine_service::states::{BlockSyncInfo, HorizonStateSync, StateEvent, StateInfo, StatusInfo},
sync::BlockSynchronizer,
sync::{BlockSynchronizer, SyncPeer},
BaseNodeStateMachine,
},
chain_storage::{BlockAddResult, BlockchainBackend},
};
use log::*;
use randomx_rs::RandomXFlag;
use std::time::Instant;
use tari_comms::PeerConnection;

const LOG_TARGET: &str = "c::bn::block_sync";

#[derive(Debug, Default)]
#[derive(Debug)]
pub struct BlockSync {
sync_peer: Option<PeerConnection>,
sync_peer: SyncPeer,
is_synced: bool,
}

impl BlockSync {
pub fn new() -> Self {
Default::default()
}

pub fn with_peer(sync_peer: PeerConnection) -> Self {
pub fn new(sync_peer: SyncPeer) -> Self {
Self {
sync_peer: Some(sync_peer),
sync_peer,
is_synced: false,
}
}
Expand All @@ -62,7 +57,7 @@ impl BlockSync {
shared.config.block_sync_config.clone(),
shared.db.clone(),
shared.connectivity.clone(),
self.sync_peer.take(),
self.sync_peer.clone(),
shared.sync_validators.block_body.clone(),
);

Expand Down Expand Up @@ -122,7 +117,16 @@ impl BlockSync {
}

impl From<HorizonStateSync> for BlockSync {
fn from(_: HorizonStateSync) -> Self {
BlockSync::new()
fn from(sync: HorizonStateSync) -> Self {
BlockSync::new(sync.into_sync_peer())
}
}

impl From<SyncPeer> for BlockSync {
fn from(sync_peer: SyncPeer) -> Self {
Self {
sync_peer,
is_synced: false,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use crate::base_node::{
state_machine_service::states::{
BlockSync,
DecideNextSync,
HeaderSync,
HorizonStateSync,
Listening,
Expand All @@ -36,12 +37,13 @@ use crate::base_node::{
use randomx_rs::RandomXFlag;
use std::fmt::{Display, Error, Formatter};
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::{peer_manager::NodeId, PeerConnection};
use tari_comms::peer_manager::NodeId;

#[derive(Debug)]
pub enum BaseNodeState {
Starting(Starting),
HeaderSync(HeaderSync),
DecideNextSync(DecideNextSync),
HorizonStateSync(HorizonStateSync),
BlockSync(BlockSync),
// The best network chain metadata
Expand All @@ -54,8 +56,10 @@ pub enum BaseNodeState {
#[derive(Debug, Clone, PartialEq)]
pub enum StateEvent {
Initialized,
HeadersSynchronized(PeerConnection),
HeadersSynchronized(SyncPeer),
HeaderSyncFailed,
ProceedToHorizonSync(SyncPeer),
ProceedToBlockSync(SyncPeer),
HorizonStateSynchronized,
HorizonStateSyncFailure,
BlocksSynchronized,
Expand All @@ -81,8 +85,6 @@ impl<E: std::error::Error> From<E> for StateEvent {
pub enum SyncStatus {
// We are behind the chain tip.
Lagging(ChainMetadata, Vec<SyncPeer>),
// We are behind the pruning horizon.
LaggingBehindHorizon(ChainMetadata, Vec<SyncPeer>),
UpToDate,
}

Expand All @@ -107,13 +109,6 @@ impl Display for SyncStatus {
m.height_of_longest_chain(),
m.accumulated_difficulty(),
),
LaggingBehindHorizon(m, v) => write!(
f,
"Lagging behind pruning horizon ({} peer(s), Network height: #{}, Difficulty: {})",
v.len(),
m.height_of_longest_chain(),
m.accumulated_difficulty(),
),
UpToDate => f.write_str("UpToDate"),
}
}
Expand All @@ -123,18 +118,20 @@ impl Display for StateEvent {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
use StateEvent::*;
match self {
Initialized => f.write_str("Initialized"),
BlocksSynchronized => f.write_str("Synchronised Blocks"),
HeadersSynchronized(conn) => write!(f, "Headers Synchronized from peer `{}`", conn.peer_node_id()),
HeaderSyncFailed => f.write_str("Header Synchronization Failed"),
HorizonStateSynchronized => f.write_str("Horizon State Synchronized"),
HorizonStateSyncFailure => f.write_str("Horizon State Synchronization Failed"),
BlockSyncFailed => f.write_str("Block Synchronization Failed"),
Initialized => write!(f, "Initialized"),
BlocksSynchronized => write!(f, "Synchronised Blocks"),
HeadersSynchronized(peer) => write!(f, "Headers Synchronized from peer `{}`", peer),
HeaderSyncFailed => write!(f, "Header Synchronization Failed"),
ProceedToHorizonSync(_) => write!(f, "Proceed to horizon sync"),
ProceedToBlockSync(_) => write!(f, "Proceed to block sync"),
HorizonStateSynchronized => write!(f, "Horizon State Synchronized"),
HorizonStateSyncFailure => write!(f, "Horizon State Synchronization Failed"),
BlockSyncFailed => write!(f, "Block Synchronization Failed"),
FallenBehind(s) => write!(f, "Fallen behind main chain - {}", s),
NetworkSilence => f.write_str("Network Silence"),
Continue => f.write_str("Continuing"),
NetworkSilence => write!(f, "Network Silence"),
Continue => write!(f, "Continuing"),
FatalError(e) => write!(f, "Fatal Error - {}", e),
UserQuit => f.write_str("User Termination"),
UserQuit => write!(f, "User Termination"),
}
}
}
Expand All @@ -145,6 +142,7 @@ impl Display for BaseNodeState {
let s = match self {
Starting(_) => "Initializing",
HeaderSync(_) => "Synchronizing block headers",
DecideNextSync(_) => "Deciding next sync",
HorizonStateSync(_) => "Synchronizing horizon state",
BlockSync(_) => "Synchronizing blocks",
Listening(_) => "Listening",
Expand Down
Loading

0 comments on commit 23e868a

Please sign in to comment.