Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sync): adds extra checks for sync stream termination #3927

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
mut client: rpc::BaseNodeSyncRpcClient,
max_latency: Duration,
) -> Result<(), BlockSyncError> {
info!(target: LOG_TARGET, "Starting block sync from peer {}", sync_peer);
self.hooks.call_on_starting_hook();

let tip_header = self.db.fetch_last_header().await?;
Expand Down Expand Up @@ -320,7 +321,7 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
.await?;

// Average time between receiving blocks from the peer - used to detect a slow sync peer
let last_avg_latency = avg_latency.calculate_average();
let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
if let Some(latency) = last_avg_latency {
sync_peer.set_latency(latency);
}
Expand All @@ -342,12 +343,14 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
block.accumulated_data().accumulated_sha_difficulty,
latency
);
if last_avg_latency.map(|avg| avg > max_latency).unwrap_or(false) {
return Err(BlockSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency,
max_latency,
});
if let Some(avg_latency) = last_avg_latency {
if avg_latency > max_latency {
return Err(BlockSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency: avg_latency,
max_latency,
});
}
}

current_block = Some(block);
Expand Down
19 changes: 13 additions & 6 deletions base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::{
base_node::sync::{hooks::Hooks, rpc, BlockchainSyncConfig, SyncPeer},
blocks::{BlockHeader, ChainBlock, ChainHeader},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
common::rolling_avg::RollingAverageTime,
consensus::ConsensusManager,
proof_of_work::randomx_factory::RandomXFactory,
proto::{
Expand Down Expand Up @@ -553,6 +554,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
split_info: ChainSplitInfo,
max_latency: Duration,
) -> Result<(), BlockHeaderSyncError> {
info!(target: LOG_TARGET, "Starting header sync from peer {}", sync_peer);
const COMMIT_EVERY_N_HEADERS: usize = 1000;

let mut has_switched_to_new_chain = false;
Expand Down Expand Up @@ -626,8 +628,10 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
let mut last_sync_timer = Instant::now();

let mut last_total_accumulated_difficulty = 0;
let mut avg_latency = RollingAverageTime::new(20);
while let Some(header) = header_stream.next().await {
let latency = last_sync_timer.elapsed();
avg_latency.add_sample(latency);
let header = BlockHeader::try_from(header?).map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -672,12 +676,15 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
self.hooks
.call_on_progress_header_hooks(current_height, split_info.remote_tip_height, &sync_peer);

if latency > max_latency {
return Err(BlockHeaderSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency,
max_latency,
});
let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
if let Some(avg_latency) = last_avg_latency {
if avg_latency > max_latency {
return Err(BlockHeaderSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency: avg_latency,
max_latency,
});
}
}

last_sync_timer = Instant::now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use croaring::Bitmap;
use futures::{stream::FuturesUnordered, StreamExt};
use log::*;
use tari_common_types::types::{Commitment, HashDigest, RangeProofService};
use tari_comms::connectivity::ConnectivityRequester;
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId};
use tari_crypto::{
commitment::HomomorphicCommitment,
tari_utilities::{hex::Hex, Hashable},
Expand All @@ -58,6 +58,7 @@ use crate::{
MmrTree,
PrunedOutput,
},
common::rolling_avg::RollingAverageTime,
consensus::ConsensusManager,
proto::base_node::{
sync_utxo as proto_sync_utxo,
Expand Down Expand Up @@ -236,6 +237,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
client: &mut rpc::BaseNodeSyncRpcClient,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
info!(target: LOG_TARGET, "Starting kernel sync from peer {}", sync_peer);
let local_num_kernels = self.db().fetch_mmr_size(MmrTree::Kernel).await?;

let remote_num_kernels = to_header.kernel_mmr_size;
Expand Down Expand Up @@ -288,8 +290,10 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let mut mmr_position = local_num_kernels;
let end = remote_num_kernels;
let mut last_sync_timer = Instant::now();
let mut avg_latency = RollingAverageTime::new(20);
while let Some(kernel) = kernel_stream.next().await {
let latency = last_sync_timer.elapsed();
avg_latency.add_sample(latency);
let kernel: TransactionKernel = kernel?.try_into().map_err(HorizonSyncError::ConversionError)?;
kernel
.verify_signature()
Expand Down Expand Up @@ -368,13 +372,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
self.hooks.call_on_progress_horizon_hooks(info);
}

if latency > self.max_latency {
return Err(HorizonSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency,
max_latency: self.max_latency,
});
}
self.check_latency(sync_peer.node_id(), &avg_latency)?;

last_sync_timer = Instant::now();
}
Expand All @@ -387,12 +385,27 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
Ok(())
}

fn check_latency(&self, peer: &NodeId, avg_latency: &RollingAverageTime) -> Result<(), HorizonSyncError> {
if let Some(avg_latency) = avg_latency.calculate_average_with_min_samples(5) {
if avg_latency > self.max_latency {
return Err(HorizonSyncError::MaxLatencyExceeded {
peer: peer.clone(),
latency: avg_latency,
max_latency: self.max_latency,
});
}
}

Ok(())
}

async fn synchronize_outputs(
&mut self,
mut sync_peer: SyncPeer,
client: &mut rpc::BaseNodeSyncRpcClient,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
info!(target: LOG_TARGET, "Starting output sync from peer {}", sync_peer);
let local_num_outputs = self.db().fetch_mmr_size(MmrTree::Utxo).await?;

let remote_num_outputs = to_header.output_mmr_size;
Expand Down Expand Up @@ -466,9 +479,11 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let mut witness_mmr = MerkleMountainRange::<HashDigest, _>::new(witness_pruned_set);
let mut constants = self.rules.consensus_constants(current_header.height()).clone();
let mut last_sync_timer = Instant::now();
let mut avg_latency = RollingAverageTime::new(20);

while let Some(response) = output_stream.next().await {
let latency = last_sync_timer.elapsed();
avg_latency.add_sample(latency);
let res: SyncUtxosResponse = response?;

if res.mmr_index != 0 && res.mmr_index != mmr_position {
Expand Down Expand Up @@ -658,13 +673,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
self.hooks.call_on_progress_horizon_hooks(info);
}

if latency > self.max_latency {
return Err(HorizonSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency,
max_latency: self.max_latency,
});
}
self.check_latency(sync_peer.node_id(), &avg_latency)?;

last_sync_timer = Instant::now();
}
Expand Down
40 changes: 35 additions & 5 deletions base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET));

if tx.is_closed() {
debug!(
target: LOG_TARGET,
"Block sync session for peer '{}' terminated early", peer_node_id
);
break;
}

match blocks {
Ok(blocks) if blocks.is_empty() => {
break;
Expand All @@ -226,6 +234,10 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ

// Ensure task stops if the peer prematurely stops their RPC session
if utils::mpsc::send_all(&tx, blocks).await.is_err() {
debug!(
target: LOG_TARGET,
"Block sync session for peer '{}' terminated early", peer_node_id
);
break;
}
},
Expand Down Expand Up @@ -288,7 +300,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
task::spawn(
async move {
// Move token into this task
let session_token = session_token;
let peer_node_id = session_token;
let iter = NonOverlappingIntegerPairIter::new(
start_header.height + 1,
start_header.height.saturating_add(count).saturating_add(1),
Expand All @@ -304,6 +316,13 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET));

if tx.is_closed() {
debug!(
target: LOG_TARGET,
"Header sync session for peer '{}' terminated early", peer_node_id
);
break;
}
match headers {
Ok(headers) if headers.is_empty() => {
break;
Expand All @@ -325,7 +344,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
"Header sync round complete for peer `{}`.", session_token,
"Header sync round complete for peer `{}`.", peer_node_id,
);
}
.instrument(span),
Expand Down Expand Up @@ -453,6 +472,8 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ

let session_token = self.try_add_exclusive_session(peer_node_id).await?;
task::spawn(async move {
// Move session token into task
let peer_node_id = session_token;
while current_height <= end_height {
if tx.is_closed() {
break;
Expand All @@ -461,6 +482,15 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
.fetch_kernels_in_block(current_header_hash.clone())
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET));

if tx.is_closed() {
debug!(
target: LOG_TARGET,
"Kernel sync session for peer '{}' terminated early", peer_node_id
);
break;
}

match res {
Ok(kernels) if kernels.is_empty() => {
let _ = tx
Expand Down Expand Up @@ -525,7 +555,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
"Kernel sync round complete for peer `{}`.", session_token,
"Kernel sync round complete for peer `{}`.", peer_node_id,
);
});
Ok(Streaming::new(rx))
Expand All @@ -546,9 +576,9 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
req.include_deleted_bitmaps
);

let _session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
let session_token = self.try_add_exclusive_session(peer_node_id.clone()).await?;
let (tx, rx) = mpsc::channel(200);
let task = SyncUtxosTask::new(self.db());
let task = SyncUtxosTask::new(self.db(), session_token);
task.run(request, tx).await?;

Ok(Streaming::new(rx))
Expand Down
35 changes: 28 additions & 7 deletions base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{sync::Arc, time::Instant};

use log::*;
use tari_comms::{
peer_manager::NodeId,
protocol::rpc::{Request, RpcStatus, RpcStatusResultExt},
utils,
};
Expand All @@ -42,21 +43,21 @@ const LOG_TARGET: &str = "c::base_node::sync_rpc::sync_utxo_task";

pub(crate) struct SyncUtxosTask<B> {
db: AsyncBlockchainDb<B>,
peer_node_id: Arc<NodeId>,
}

impl<B> SyncUtxosTask<B>
where B: BlockchainBackend + 'static
{
pub(crate) fn new(db: AsyncBlockchainDb<B>) -> Self {
Self { db }
pub(crate) fn new(db: AsyncBlockchainDb<B>, peer_node_id: Arc<NodeId>) -> Self {
Self { db, peer_node_id }
}

pub(crate) async fn run(
self,
request: Request<SyncUtxosRequest>,
mut tx: mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>,
) -> Result<(), RpcStatus> {
let peer = request.context().peer_node_id().clone();
let msg = request.into_message();
let start_header = self
.db
Expand Down Expand Up @@ -105,7 +106,10 @@ where B: BlockchainBackend + 'static
let include_pruned_utxos = msg.include_pruned_utxos;
let include_deleted_bitmaps = msg.include_deleted_bitmaps;
task::spawn(async move {
debug!(target: LOG_TARGET, "Starting UTXO stream for peer '{}'", peer);
debug!(
target: LOG_TARGET,
"Starting UTXO stream for peer '{}'", self.peer_node_id
);
if let Err(err) = self
.start_streaming(
&mut tx,
Expand All @@ -118,10 +122,16 @@ where B: BlockchainBackend + 'static
)
.await
{
debug!(target: LOG_TARGET, "UTXO stream errored for peer '{}': {}", peer, err);
debug!(
target: LOG_TARGET,
"UTXO stream errored for peer '{}': {}", self.peer_node_id, err
);
let _ = tx.send(Err(err)).await;
}
debug!(target: LOG_TARGET, "UTXO stream completed for peer '{}'", peer);
debug!(
target: LOG_TARGET,
"UTXO stream completed for peer '{}'", self.peer_node_id
);
metrics::active_sync_peers().dec();
});

Expand Down Expand Up @@ -178,7 +188,10 @@ where B: BlockchainBackend + 'static
let end = current_header.output_mmr_size;

if tx.is_closed() {
debug!(target: LOG_TARGET, "Exiting sync_utxos early because client has gone",);
debug!(
target: LOG_TARGET,
"Peer '{}' exited UTXO sync session early", self.peer_node_id
);
break;
}

Expand All @@ -196,6 +209,14 @@ where B: BlockchainBackend + 'static
current_header.height,
deleted_diff.cardinality(),
);
if tx.is_closed() {
debug!(
target: LOG_TARGET,
"Peer '{}' exited UTXO sync session early", self.peer_node_id
);
break;
}

let utxos = utxos
.into_iter()
.skip(skip_outputs as usize)
Expand Down
7 changes: 7 additions & 0 deletions base_layer/core/src/common/rolling_avg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,11 @@ impl RollingAverageTime {
u64::try_from(total_time.as_nanos()).unwrap_or(u64::MAX) / self.samples.len() as u64,
))
}

pub fn calculate_average_with_min_samples(&self, min_samples: usize) -> Option<Duration> {
if self.samples.len() < min_samples {
return None;
}
self.calculate_average()
}
}
Loading