Skip to content

Commit

Permalink
fix(sync): adds extra checks for sync stream termination
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Mar 17, 2022
1 parent 7145201 commit 4c24900
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 44 deletions.
17 changes: 10 additions & 7 deletions base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
mut client: rpc::BaseNodeSyncRpcClient,
max_latency: Duration,
) -> Result<(), BlockSyncError> {
info!(target: LOG_TARGET, "Starting block sync from peer {}", sync_peer);
self.hooks.call_on_starting_hook();

let tip_header = self.db.fetch_last_header().await?;
Expand Down Expand Up @@ -320,7 +321,7 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
.await?;

// Average time between receiving blocks from the peer - used to detect a slow sync peer
let last_avg_latency = avg_latency.calculate_average();
let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
if let Some(latency) = last_avg_latency {
sync_peer.set_latency(latency);
}
Expand All @@ -342,12 +343,14 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
block.accumulated_data().accumulated_sha_difficulty,
latency
);
if last_avg_latency.map(|avg| avg > max_latency).unwrap_or(false) {
return Err(BlockSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency,
max_latency,
});
if let Some(avg_latency) = last_avg_latency {
if avg_latency > max_latency {
return Err(BlockSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency: avg_latency,
max_latency,
});
}
}

current_block = Some(block);
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/sync/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct BlockchainSyncConfig {
impl Default for BlockchainSyncConfig {
fn default() -> Self {
Self {
initial_max_sync_latency: Duration::from_secs(10),
initial_max_sync_latency: Duration::from_millis(600),
max_latency_increase: Duration::from_secs(2),
ban_period: Duration::from_secs(30 * 60),
short_ban_period: Duration::from_secs(60),
Expand Down
19 changes: 13 additions & 6 deletions base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::{
base_node::sync::{hooks::Hooks, rpc, BlockchainSyncConfig, SyncPeer},
blocks::{BlockHeader, ChainBlock, ChainHeader},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
common::rolling_avg::RollingAverageTime,
consensus::ConsensusManager,
proof_of_work::randomx_factory::RandomXFactory,
proto::{
Expand Down Expand Up @@ -540,6 +541,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
split_info: ChainSplitInfo,
max_latency: Duration,
) -> Result<(), BlockHeaderSyncError> {
info!(target: LOG_TARGET, "Starting header sync from peer {}", sync_peer);
const COMMIT_EVERY_N_HEADERS: usize = 1000;

let mut has_switched_to_new_chain = false;
Expand Down Expand Up @@ -613,8 +615,10 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
let mut last_sync_timer = Instant::now();

let mut last_total_accumulated_difficulty = 0;
let mut avg_latency = RollingAverageTime::new(20);
while let Some(header) = header_stream.next().await {
let latency = last_sync_timer.elapsed();
avg_latency.add_sample(latency);
let header = BlockHeader::try_from(header?).map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -659,12 +663,15 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
self.hooks
.call_on_progress_header_hooks(current_height, split_info.remote_tip_height, &sync_peer);

if latency > max_latency {
return Err(BlockHeaderSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency,
max_latency,
});
let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
if let Some(avg_latency) = last_avg_latency {
if avg_latency > max_latency {
return Err(BlockHeaderSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency: avg_latency,
max_latency,
});
}
}

last_sync_timer = Instant::now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use croaring::Bitmap;
use futures::{stream::FuturesUnordered, StreamExt};
use log::*;
use tari_common_types::types::{Commitment, HashDigest, RangeProofService};
use tari_comms::connectivity::ConnectivityRequester;
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId};
use tari_crypto::{
commitment::HomomorphicCommitment,
tari_utilities::{hex::Hex, Hashable},
Expand All @@ -58,6 +58,7 @@ use crate::{
MmrTree,
PrunedOutput,
},
common::rolling_avg::RollingAverageTime,
consensus::ConsensusManager,
proto::base_node::{
sync_utxo as proto_sync_utxo,
Expand Down Expand Up @@ -236,6 +237,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
client: &mut rpc::BaseNodeSyncRpcClient,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
info!(target: LOG_TARGET, "Starting kernel sync from peer {}", sync_peer);
let local_num_kernels = self.db().fetch_mmr_size(MmrTree::Kernel).await?;

let remote_num_kernels = to_header.kernel_mmr_size;
Expand Down Expand Up @@ -288,8 +290,10 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let mut mmr_position = local_num_kernels;
let end = remote_num_kernels;
let mut last_sync_timer = Instant::now();
let mut avg_latency = RollingAverageTime::new(20);
while let Some(kernel) = kernel_stream.next().await {
let latency = last_sync_timer.elapsed();
avg_latency.add_sample(latency);
let kernel: TransactionKernel = kernel?.try_into().map_err(HorizonSyncError::ConversionError)?;
kernel
.verify_signature()
Expand Down Expand Up @@ -368,13 +372,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
self.hooks.call_on_progress_horizon_hooks(info);
}

if latency > self.max_latency {
return Err(HorizonSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency,
max_latency: self.max_latency,
});
}
self.check_latency(sync_peer.node_id(), &avg_latency)?;

last_sync_timer = Instant::now();
}
Expand All @@ -387,12 +385,27 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
Ok(())
}

fn check_latency(&self, peer: &NodeId, avg_latency: &RollingAverageTime) -> Result<(), HorizonSyncError> {
if let Some(avg_latency) = avg_latency.calculate_average_with_min_samples(5) {
if avg_latency > self.max_latency {
return Err(HorizonSyncError::MaxLatencyExceeded {
peer: peer.clone(),
latency: avg_latency,
max_latency: self.max_latency,
});
}
}

Ok(())
}

async fn synchronize_outputs(
&mut self,
mut sync_peer: SyncPeer,
client: &mut rpc::BaseNodeSyncRpcClient,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
info!(target: LOG_TARGET, "Starting output sync from peer {}", sync_peer);
let local_num_outputs = self.db().fetch_mmr_size(MmrTree::Utxo).await?;

let remote_num_outputs = to_header.output_mmr_size;
Expand Down Expand Up @@ -466,9 +479,11 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let mut witness_mmr = MerkleMountainRange::<HashDigest, _>::new(witness_pruned_set);
let mut constants = self.rules.consensus_constants(current_header.height()).clone();
let mut last_sync_timer = Instant::now();
let mut avg_latency = RollingAverageTime::new(20);

while let Some(response) = output_stream.next().await {
let latency = last_sync_timer.elapsed();
avg_latency.add_sample(latency);
let res: SyncUtxosResponse = response?;

if res.mmr_index != 0 && res.mmr_index != mmr_position {
Expand Down Expand Up @@ -658,13 +673,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
self.hooks.call_on_progress_horizon_hooks(info);
}

if latency > self.max_latency {
return Err(HorizonSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency,
max_latency: self.max_latency,
});
}
self.check_latency(sync_peer.node_id(), &avg_latency)?;

last_sync_timer = Instant::now();
}
Expand Down
40 changes: 35 additions & 5 deletions base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,14 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET));

if tx.is_closed() {
debug!(
target: LOG_TARGET,
"Block sync session for peer '{}' terminated early", peer_node_id
);
break;
}

match blocks {
Ok(blocks) if blocks.is_empty() => {
break;
Expand All @@ -229,6 +237,10 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ

// Ensure task stops if the peer prematurely stops their RPC session
if utils::mpsc::send_all(&tx, blocks).await.is_err() {
debug!(
target: LOG_TARGET,
"Block sync session for peer '{}' terminated early", peer_node_id
);
break;
}
},
Expand Down Expand Up @@ -294,7 +306,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
task::spawn(
async move {
// Move token into this task
let session_token = session_token;
let peer_node_id = session_token;
let iter = NonOverlappingIntegerPairIter::new(
start_header.height + 1,
start_header.height.saturating_add(count).saturating_add(1),
Expand All @@ -310,6 +322,13 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET));

if tx.is_closed() {
debug!(
target: LOG_TARGET,
"Header sync session for peer '{}' terminated early", peer_node_id
);
break;
}
match headers {
Ok(headers) if headers.is_empty() => {
break;
Expand All @@ -331,7 +350,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
"Header sync round complete for peer `{}`.", session_token,
"Header sync round complete for peer `{}`.", peer_node_id,
);
}
.instrument(span),
Expand Down Expand Up @@ -462,6 +481,8 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ

let session_token = self.try_add_exclusive_session(peer_node_id).await?;
task::spawn(async move {
// Move session token into task
let peer_node_id = session_token;
while current_height <= end_height {
if tx.is_closed() {
break;
Expand All @@ -470,6 +491,15 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
.fetch_kernels_in_block(current_header_hash.clone())
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET));

if tx.is_closed() {
debug!(
target: LOG_TARGET,
"Kernel sync session for peer '{}' terminated early", peer_node_id
);
break;
}

match res {
Ok(kernels) if kernels.is_empty() => {
let _ = tx
Expand Down Expand Up @@ -534,7 +564,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
"Kernel sync round complete for peer `{}`.", session_token,
"Kernel sync round complete for peer `{}`.", peer_node_id,
);
});
Ok(Streaming::new(rx))
Expand All @@ -555,9 +585,9 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
req.include_deleted_bitmaps
);

let _session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
let (tx, rx) = mpsc::channel(200);
let task = SyncUtxosTask::new(self.db());
let task = SyncUtxosTask::new(self.db(), session_token);
task.run(request, tx).await?;

Ok(Streaming::new(rx))
Expand Down
35 changes: 28 additions & 7 deletions base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{sync::Arc, time::Instant};

use log::*;
use tari_comms::{
peer_manager::NodeId,
protocol::rpc::{Request, RpcStatus},
utils,
};
Expand All @@ -42,21 +43,21 @@ const LOG_TARGET: &str = "c::base_node::sync_rpc::sync_utxo_task";

pub(crate) struct SyncUtxosTask<B> {
db: AsyncBlockchainDb<B>,
peer_node_id: Arc<NodeId>,
}

impl<B> SyncUtxosTask<B>
where B: BlockchainBackend + 'static
{
pub(crate) fn new(db: AsyncBlockchainDb<B>) -> Self {
Self { db }
pub(crate) fn new(db: AsyncBlockchainDb<B>, peer_node_id: Arc<NodeId>) -> Self {
Self { db, peer_node_id }
}

pub(crate) async fn run(
self,
request: Request<SyncUtxosRequest>,
mut tx: mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>,
) -> Result<(), RpcStatus> {
let peer = request.context().peer_node_id().clone();
let msg = request.into_message();
let start_header = self
.db
Expand Down Expand Up @@ -105,7 +106,10 @@ where B: BlockchainBackend + 'static
let include_pruned_utxos = msg.include_pruned_utxos;
let include_deleted_bitmaps = msg.include_deleted_bitmaps;
task::spawn(async move {
debug!(target: LOG_TARGET, "Starting UTXO stream for peer '{}'", peer);
debug!(
target: LOG_TARGET,
"Starting UTXO stream for peer '{}'", self.peer_node_id
);
if let Err(err) = self
.start_streaming(
&mut tx,
Expand All @@ -118,10 +122,16 @@ where B: BlockchainBackend + 'static
)
.await
{
debug!(target: LOG_TARGET, "UTXO stream errored for peer '{}': {}", peer, err);
debug!(
target: LOG_TARGET,
"UTXO stream errored for peer '{}': {}", self.peer_node_id, err
);
let _ = tx.send(Err(err)).await;
}
debug!(target: LOG_TARGET, "UTXO stream completed for peer '{}'", peer);
debug!(
target: LOG_TARGET,
"UTXO stream completed for peer '{}'", self.peer_node_id
);
metrics::active_sync_peers().dec();
});

Expand Down Expand Up @@ -178,7 +188,10 @@ where B: BlockchainBackend + 'static
let end = current_header.output_mmr_size;

if tx.is_closed() {
debug!(target: LOG_TARGET, "Exiting sync_utxos early because client has gone",);
debug!(
target: LOG_TARGET,
"Peer '{}' exited UTXO sync session early", self.peer_node_id
);
break;
}

Expand All @@ -196,6 +209,14 @@ where B: BlockchainBackend + 'static
current_header.height,
deleted_diff.cardinality(),
);
if tx.is_closed() {
debug!(
target: LOG_TARGET,
"Peer '{}' exited UTXO sync session early", self.peer_node_id
);
break;
}

let utxos = utxos
.into_iter()
.skip(skip_outputs as usize)
Expand Down
Loading

0 comments on commit 4c24900

Please sign in to comment.