diff --git a/applications/tari_base_node/src/bootstrap.rs b/applications/tari_base_node/src/bootstrap.rs index 4bf2c5f053..1c16b11ca0 100644 --- a/applications/tari_base_node/src/bootstrap.rs +++ b/applications/tari_base_node/src/bootstrap.rs @@ -36,6 +36,7 @@ use tari_core::{ state_machine_service::{initializer::BaseNodeStateMachineInitializer, states::HorizonSyncConfig}, BaseNodeStateMachineConfig, BlockSyncConfig, + LocalNodeCommsInterface, StateMachineHandle, }, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, BlockchainDatabase}, @@ -211,6 +212,7 @@ where B: BlockchainBackend + 'static config: &GlobalConfig, ) -> UnspawnedCommsNode { let dht = handles.expect_handle::(); + let base_node_service = handles.expect_handle::(); let builder = RpcServer::builder(); let builder = match config.rpc_max_simultaneous_sessions { Some(limit) => builder.with_maximum_simultaneous_sessions(limit), @@ -228,7 +230,10 @@ where B: BlockchainBackend + 'static // Add your RPC services here ‍🏴‍☠️️☮️🌊 let rpc_server = rpc_server .add_service(dht.rpc_service()) - .add_service(base_node::create_base_node_sync_rpc_service(db.clone())) + .add_service(base_node::create_base_node_sync_rpc_service( + db.clone(), + base_node_service, + )) .add_service(mempool::create_mempool_rpc_service( handles.expect_handle::(), )) diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index 5e3894e57b..4a91805ab9 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -408,6 +408,15 @@ where T: BlockchainBackend + 'static ) -> Result<(), CommsInterfaceError> { let NewBlock { block_hash } = new_block; + if self.blockchain_db.inner().is_add_block_disabled() { + info!( + target: LOG_TARGET, + "Ignoring block message ({}) because add_block is locked", + block_hash.to_hex() + ); + return Ok(()); + } + // Only a single block request can complete at a time. // As multiple NewBlock requests arrive from propagation, this semaphore prevents multiple requests to nodes for // the same full block. The first request that succeeds will stop the node from requesting the block from any diff --git a/base_layer/core/src/base_node/state_machine_service/state_machine.rs b/base_layer/core/src/base_node/state_machine_service/state_machine.rs index 5a0f2a062f..4ec0f8b757 100644 --- a/base_layer/core/src/base_node/state_machine_service/state_machine.rs +++ b/base_layer/core/src/base_node/state_machine_service/state_machine.rs @@ -34,7 +34,16 @@ use crate::{ comms_interface::LocalNodeCommsInterface, state_machine_service::{ states, - states::{BaseNodeState, HorizonSyncConfig, StateEvent, StateInfo, StatusInfo, SyncPeerConfig, SyncStatus}, + states::{ + BaseNodeState, + HeaderSyncState, + HorizonSyncConfig, + StateEvent, + StateInfo, + StatusInfo, + SyncPeerConfig, + SyncStatus, + }, }, sync::{BlockSyncConfig, SyncValidators}, }, @@ -137,22 +146,51 @@ impl BaseNodeStateMachine { /// Describe the Finite State Machine for the base node. This function describes _every possible_ state /// transition for the node given its current state and an event that gets triggered. pub fn transition(&self, state: BaseNodeState, event: StateEvent) -> BaseNodeState { + let db = self.db.inner(); use self::{BaseNodeState::*, StateEvent::*, SyncStatus::*}; match (state, event) { (Starting(s), Initialized) => Listening(s.into()), - (Listening(_), FallenBehind(Lagging(_, sync_peers))) => HeaderSync(sync_peers.into()), - (HeaderSync(s), HeaderSyncFailed) => Waiting(s.into()), - (HeaderSync(s), Continue | NetworkSilence) => Listening(s.into()), + ( + Listening(_), + FallenBehind(Lagging { + local: local_metadata, + sync_peers, + .. + }), + ) => { + db.set_disable_add_block_flag(); + HeaderSync(HeaderSyncState::new(sync_peers, local_metadata)) + }, + (HeaderSync(s), HeaderSyncFailed) => { + db.clear_disable_add_block_flag(); + Waiting(s.into()) + }, + (HeaderSync(s), Continue | NetworkSilence) => { + db.clear_disable_add_block_flag(); + Listening(s.into()) + }, (HeaderSync(s), HeadersSynchronized(_)) => DecideNextSync(s.into()), (DecideNextSync(_), ProceedToHorizonSync(peer)) => HorizonStateSync(peer.into()), - (DecideNextSync(s), Continue) => Listening(s.into()), + (DecideNextSync(s), Continue) => { + db.clear_disable_add_block_flag(); + Listening(s.into()) + }, (HorizonStateSync(s), HorizonStateSynchronized) => BlockSync(s.into()), - (HorizonStateSync(s), HorizonStateSyncFailure) => Waiting(s.into()), + (HorizonStateSync(s), HorizonStateSyncFailure) => { + db.clear_disable_add_block_flag(); + Waiting(s.into()) + }, (DecideNextSync(_), ProceedToBlockSync(peer)) => BlockSync(peer.into()), - (BlockSync(s), BlocksSynchronized) => Listening(s.into()), - (BlockSync(s), BlockSyncFailed) => Waiting(s.into()), + (BlockSync(s), BlocksSynchronized) => { + db.clear_disable_add_block_flag(); + Listening(s.into()) + }, + (BlockSync(s), BlockSyncFailed) => { + db.clear_disable_add_block_flag(); + Waiting(s.into()) + }, (Waiting(s), Continue) => Listening(s.into()), (_, FatalError(s)) => Shutdown(states::Shutdown::with_reason(s)), diff --git a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs index ac3374dab8..1f796fec87 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs @@ -30,7 +30,7 @@ use crate::base_node::{ state_machine_service::states::{ BlockSync, DecideNextSync, - HeaderSync, + HeaderSyncState, HorizonStateSync, Listening, ListeningInfo, @@ -44,7 +44,7 @@ use crate::base_node::{ #[derive(Debug)] pub enum BaseNodeState { Starting(Starting), - HeaderSync(HeaderSync), + HeaderSync(HeaderSyncState), DecideNextSync(DecideNextSync), HorizonStateSync(HorizonStateSync), BlockSync(BlockSync), @@ -86,7 +86,11 @@ impl From for StateEvent { #[derive(Debug, Clone, PartialEq)] pub enum SyncStatus { // We are behind the chain tip. - Lagging(ChainMetadata, Vec), + Lagging { + local: ChainMetadata, + network: ChainMetadata, + sync_peers: Vec, + }, UpToDate, } @@ -104,12 +108,14 @@ impl Display for SyncStatus { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { use SyncStatus::*; match self { - Lagging(m, v) => write!( + Lagging { + network, sync_peers, .. + } => write!( f, "Lagging behind {} peers (#{}, Difficulty: {})", - v.len(), - m.height_of_longest_chain(), - m.accumulated_difficulty(), + sync_peers.len(), + network.height_of_longest_chain(), + network.accumulated_difficulty(), ), UpToDate => f.write_str("UpToDate"), } diff --git a/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs index 2171f8c221..aa96623174 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs @@ -23,11 +23,12 @@ use std::{cmp::Ordering, time::Instant}; use log::*; +use tari_common_types::chain_metadata::ChainMetadata; use crate::{ base_node::{ comms_interface::BlockEvent, - state_machine_service::states::{BlockSyncInfo, Listening, StateEvent, StateInfo, StatusInfo}, + state_machine_service::states::{BlockSyncInfo, StateEvent, StateInfo, StatusInfo}, sync::{BlockHeaderSyncError, HeaderSynchronizer, SyncPeer}, BaseNodeStateMachine, }, @@ -36,14 +37,15 @@ use crate::{ const LOG_TARGET: &str = "c::bn::header_sync"; -#[derive(Clone, Debug, Default)] -pub struct HeaderSync { +#[derive(Clone, Debug)] +pub struct HeaderSyncState { sync_peers: Vec, is_synced: bool, + local_metadata: ChainMetadata, } -impl HeaderSync { - pub fn new(mut sync_peers: Vec) -> Self { +impl HeaderSyncState { + pub fn new(mut sync_peers: Vec, local_metadata: ChainMetadata) -> Self { // Sort by latency lowest to highest sync_peers.sort_by(|a, b| match (a.latency(), b.latency()) { (None, None) => Ordering::Equal, @@ -55,6 +57,7 @@ impl HeaderSync { Self { sync_peers, is_synced: false, + local_metadata, } } @@ -77,6 +80,7 @@ impl HeaderSync { shared.connectivity.clone(), &self.sync_peers, shared.randomx_factory.clone(), + &self.local_metadata, ); let status_event_sender = shared.status_event_sender.clone(); @@ -141,14 +145,3 @@ impl HeaderSync { } } } - -impl From for HeaderSync { - fn from(_: Listening) -> Self { - Default::default() - } -} -impl From> for HeaderSync { - fn from(peers: Vec) -> Self { - Self::new(peers) - } -} diff --git a/base_layer/core/src/base_node/state_machine_service/states/listening.rs b/base_layer/core/src/base_node/state_machine_service/states/listening.rs index 94ce74e507..e319f6d7c8 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/listening.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/listening.rs @@ -40,7 +40,7 @@ use crate::{ states::{ BlockSync, DecideNextSync, - HeaderSync, + HeaderSyncState, StateEvent, StateEvent::FatalError, StateInfo, @@ -195,7 +195,7 @@ impl Listening { if self.is_synced && best_metadata.height_of_longest_chain() == local.height_of_longest_chain() + 1 && time_since_better_block - .map(|ts: Instant| ts.elapsed() < Duration::from_secs(30)) + .map(|ts: Instant| ts.elapsed() < Duration::from_secs(60)) .unwrap_or(true) { if time_since_better_block.is_none() { @@ -217,7 +217,7 @@ impl Listening { peer_metadata_list }; - let local = match shared.db.get_chain_metadata().await { + let local_metadata = match shared.db.get_chain_metadata().await { Ok(m) => m, Err(e) => { return FatalError(format!("Could not get local blockchain metadata. {}", e)); @@ -227,7 +227,7 @@ impl Listening { let sync_mode = determine_sync_mode( shared.config.blocks_behind_before_considered_lagging, - &local, + &local_metadata, best_metadata, sync_peers, ); @@ -266,8 +266,8 @@ impl From for Listening { } } -impl From for Listening { - fn from(sync: HeaderSync) -> Self { +impl From for Listening { + fn from(sync: HeaderSyncState) -> Self { Self { is_synced: sync.is_synced(), } @@ -356,12 +356,15 @@ fn determine_sync_mode( return UpToDate; }; - let sync_peers = sync_peers.into_iter().cloned().collect(); debug!( target: LOG_TARGET, "Lagging (local height = {}, network height = {})", local_tip_height, network_tip_height ); - Lagging(network.clone(), sync_peers) + Lagging { + local: local.clone(), + network: network.clone(), + sync_peers: sync_peers.into_iter().cloned().collect(), + } } else { info!( target: LOG_TARGET, @@ -497,28 +500,28 @@ mod test { let network = ChainMetadata::new(0, Vec::new(), 0, 0, 500_001); match determine_sync_mode(0, &local, &network, vec![]) { - SyncStatus::Lagging(n, _) => assert_eq!(n, network), + SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network), _ => panic!(), } let local = ChainMetadata::new(100, Vec::new(), 50, 50, 500_000); let network = ChainMetadata::new(150, Vec::new(), 0, 0, 500_001); match determine_sync_mode(0, &local, &network, vec![]) { - SyncStatus::Lagging(n, _) => assert_eq!(n, network), + SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network), _ => panic!(), } let local = ChainMetadata::new(0, Vec::new(), 50, 50, 500_000); let network = ChainMetadata::new(100, Vec::new(), 0, 0, 500_001); match determine_sync_mode(0, &local, &network, vec![]) { - SyncStatus::Lagging(n, _) => assert_eq!(n, network), + SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network), _ => panic!(), } let local = ChainMetadata::new(99, Vec::new(), 50, 50, 500_000); let network = ChainMetadata::new(150, Vec::new(), 0, 0, 500_001); match determine_sync_mode(0, &local, &network, vec![]) { - SyncStatus::Lagging(n, _) => assert_eq!(n, network), + SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network), _ => panic!(), } } diff --git a/base_layer/core/src/base_node/state_machine_service/states/mod.rs b/base_layer/core/src/base_node/state_machine_service/states/mod.rs index c111149f0b..9dc8ba30b0 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/mod.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/mod.rs @@ -67,7 +67,7 @@ pub(crate) mod helpers; pub use helpers::SyncPeerConfig; mod header_sync; -pub use header_sync::HeaderSync; +pub use header_sync::HeaderSyncState; mod sync_decide; pub use sync_decide::DecideNextSync; diff --git a/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs b/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs index a56f7a5fba..950d3dbcc7 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/sync_decide.rs @@ -25,7 +25,7 @@ use log::*; use crate::{ base_node::{ state_machine_service::{ - states::{HeaderSync, StateEvent}, + states::{HeaderSyncState, StateEvent}, BaseNodeStateMachine, }, sync::SyncPeer, @@ -118,8 +118,8 @@ fn find_best_latency<'a, I: IntoIterator>(iter: I) -> Optio .cloned() } -impl From for DecideNextSync { - fn from(sync: HeaderSync) -> Self { +impl From for DecideNextSync { + fn from(sync: HeaderSyncState) -> Self { Self { sync_peers: sync.into_sync_peers(), } diff --git a/base_layer/core/src/base_node/state_machine_service/states/waiting.rs b/base_layer/core/src/base_node/state_machine_service/states/waiting.rs index 38383678bf..8f2df4551e 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/waiting.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/waiting.rs @@ -25,7 +25,7 @@ use std::time::Duration; use log::info; use tokio::time::sleep; -use crate::base_node::state_machine_service::states::{BlockSync, HeaderSync, HorizonStateSync, StateEvent}; +use crate::base_node::state_machine_service::states::{BlockSync, HeaderSyncState, HorizonStateSync, StateEvent}; const LOG_TARGET: &str = "c::bn::state_machine_service::states::waiting"; @@ -68,8 +68,8 @@ impl From for Waiting { } } -impl From for Waiting { - fn from(_: HeaderSync) -> Self { +impl From for Waiting { + fn from(_: HeaderSyncState) -> Self { Default::default() } } diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index ab26ff6588..bbde2db28d 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -191,7 +191,10 @@ impl BlockSynchronizer { .fetch_chain_header_by_block_hash(block.hash.clone()) .await? .ok_or_else(|| { - BlockSyncError::ProtocolViolation("Peer sent hash for block header we do not have".into()) + BlockSyncError::ProtocolViolation(format!( + "Peer sent hash ({}) for block header we do not have", + block.hash.to_hex() + )) })?; let current_height = header.height(); diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index de54877cf0..cd13af0399 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -28,7 +28,7 @@ use std::{ use futures::StreamExt; use log::*; -use tari_common_types::types::HashOutput; +use tari_common_types::{chain_metadata::ChainMetadata, types::HashOutput}; use tari_comms::{ connectivity::ConnectivityRequester, peer_manager::NodeId, @@ -63,6 +63,7 @@ pub struct HeaderSynchronizer<'a, B> { connectivity: ConnectivityRequester, sync_peers: &'a [SyncPeer], hooks: Hooks, + local_metadata: &'a ChainMetadata, } impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { @@ -73,6 +74,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { connectivity: ConnectivityRequester, sync_peers: &'a [SyncPeer], randomx_factory: RandomXFactory, + local_metadata: &'a ChainMetadata, ) -> Self { Self { config, @@ -81,6 +83,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { connectivity, sync_peers, hooks: Default::default(), + local_metadata, } } @@ -247,11 +250,28 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { ); Ok(()) } else { + // Check if the metadata that we had when we decided to enter header sync is behind the peer's + // claimed one. If so, our chain has updated in the meantime and the sync peer + // is behaving. + if self.local_metadata.accumulated_difficulty() <= + sync_peer.claimed_chain_metadata().accumulated_difficulty() + { + debug!( + target: LOG_TARGET, + "Local blockchain received a better block through propagation at height {} (was: {}). \ + Proceeding to archival/pruned block sync", + metadata.height_of_longest_chain(), + self.local_metadata.height_of_longest_chain() + ); + return Ok(()); + } debug!( target: LOG_TARGET, - "Headers and block state are already in-sync (Header Tip: {}, Block tip: {})", + "Headers and block state are already in-sync (Header Tip: {}, Block tip: {}, Peer's height: \ + {})", header_tip_height, - metadata.height_of_longest_chain() + metadata.height_of_longest_chain(), + sync_peer.claimed_chain_metadata().height_of_longest_chain(), ); Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata { claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(), @@ -364,13 +384,16 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { fork_hash_index, tip_height: remote_tip_height, } = resp; - debug!( - target: LOG_TARGET, - "Found split {} blocks back, received {} headers from peer `{}`", - steps_back, - headers.len(), - sync_peer - ); + + if steps_back > 0 { + debug!( + target: LOG_TARGET, + "Found chain split {} blocks back, received {} headers from peer `{}`", + steps_back, + headers.len(), + sync_peer + ); + } if fork_hash_index >= block_hashes.len() as u64 { let _ = self @@ -658,7 +681,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { // Check that the remote tip is stronger than the local tip let proposed_tip = chain_headers.last().unwrap(); - self.header_validator.compare_chains(current_tip, proposed_tip).is_lt() + self.header_validator.compare_chains(current_tip, proposed_tip).is_le() } async fn switch_to_pending_chain(&mut self, split_info: &ChainSplitInfo) -> Result<(), BlockHeaderSyncError> { diff --git a/base_layer/core/src/base_node/sync/rpc/mod.rs b/base_layer/core/src/base_node/sync/rpc/mod.rs index 2f540636be..8d063c52a0 100644 --- a/base_layer/core/src/base_node/sync/rpc/mod.rs +++ b/base_layer/core/src/base_node/sync/rpc/mod.rs @@ -36,7 +36,10 @@ use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming}; use tari_comms_rpc_macros::tari_rpc; #[cfg(feature = "base_node")] -use crate::chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}; +use crate::{ + base_node::LocalNodeCommsInterface, + chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}, +}; use crate::{ proto, proto::base_node::{ @@ -95,6 +98,7 @@ pub trait BaseNodeSyncService: Send + Sync + 'static { #[cfg(feature = "base_node")] pub fn create_base_node_sync_rpc_service( db: AsyncBlockchainDb, + base_node_service: LocalNodeCommsInterface, ) -> BaseNodeSyncRpcServer> { - BaseNodeSyncRpcServer::new(BaseNodeSyncRpcService::new(db)) + BaseNodeSyncRpcServer::new(BaseNodeSyncRpcService::new(db, base_node_service)) } diff --git a/base_layer/core/src/base_node/sync/rpc/service.rs b/base_layer/core/src/base_node/sync/rpc/service.rs index 2c9f87a1e1..528ce946e8 100644 --- a/base_layer/core/src/base_node/sync/rpc/service.rs +++ b/base_layer/core/src/base_node/sync/rpc/service.rs @@ -39,8 +39,12 @@ use tokio::{ use tracing::{instrument, span, Instrument, Level}; use crate::{ - base_node::sync::rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService}, - chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}, + base_node::{ + comms_interface::BlockEvent, + sync::rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService}, + LocalNodeCommsInterface, + }, + chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend}, iterators::NonOverlappingIntegerPairIter, proto, proto::base_node::{ @@ -60,13 +64,15 @@ const LOG_TARGET: &str = "c::base_node::sync_rpc"; pub struct BaseNodeSyncRpcService { db: AsyncBlockchainDb, active_sessions: RwLock>>, + base_node_service: LocalNodeCommsInterface, } impl BaseNodeSyncRpcService { - pub fn new(db: AsyncBlockchainDb) -> Self { + pub fn new(db: AsyncBlockchainDb, base_node_service: LocalNodeCommsInterface) -> Self { Self { db, active_sessions: RwLock::new(Vec::new()), + base_node_service, } } @@ -101,6 +107,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ ) -> Result, RpcStatus> { let peer_node_id = request.context().peer_node_id().clone(); let message = request.into_message(); + let mut block_event_stream = self.base_node_service.get_block_event_stream(); let db = self.db(); let start_header = db @@ -114,16 +121,16 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ .await .map_err(RpcStatus::log_internal_error(LOG_TARGET))?; - let start = start_header.height + 1; - if start < metadata.pruned_height() { + let start_height = start_header.height + 1; + if start_height < metadata.pruned_height() { return Err(RpcStatus::bad_request(format!( "Requested full block body at height {}, however this node has an effective pruned height of {}", - start, + start_height, metadata.pruned_height() ))); } - if start > metadata.height_of_longest_chain() { + if start_height > metadata.height_of_longest_chain() { return Ok(Streaming::empty()); } @@ -133,17 +140,17 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ .map_err(RpcStatus::log_internal_error(LOG_TARGET))? .ok_or_else(|| RpcStatus::not_found("Requested end block sync hash was not found"))?; - let end = end_header.height; - if start > end { + let end_height = end_header.height; + if start_height > end_height { return Err(RpcStatus::bad_request(format!( "Start block #{} is higher than end block #{}", - start, end + start_height, end_height ))); } debug!( target: LOG_TARGET, - "Initiating block sync with peer `{}` from height {} to {}", peer_node_id, start, end, + "Initiating block sync with peer `{}` from height {} to {}", peer_node_id, start_height, end_height, ); let session_token = self.try_add_exclusive_session(peer_node_id).await?; @@ -155,16 +162,41 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ task::spawn( async move { // Move token into this task - let session_token = session_token; - let iter = NonOverlappingIntegerPairIter::new(start, end + 1, BATCH_SIZE); + let peer_node_id = session_token; + let iter = NonOverlappingIntegerPairIter::new(start_height, end_height + 1, BATCH_SIZE); for (start, end) in iter { if tx.is_closed() { break; } + // Check for reorgs during sync + while let Ok(block_event) = block_event_stream.try_recv() { + if let BlockEvent::ValidBlockAdded(_, BlockAddResult::ChainReorg { removed, .. }, _) = + &*block_event + { + if let Some(reorg_block) = removed + .iter() + // If the reorg happens before the end height of sync we let the peer know that the chain they are syncing with has changed + .find(|block| block.height() <= end_height) + { + warn!( + target: LOG_TARGET, + "Block reorg detected at height {} during sync, letting the sync peer {} know.", + reorg_block.height(), + peer_node_id + ); + let _ = tx.send(Err(RpcStatus::conflict(format!( + "Reorg at height {} detected", + reorg_block.height() + )))); + return; + } + } + } + debug!( target: LOG_TARGET, - "Sending blocks #{} - #{} to '{}'", start, end, session_token + "Sending blocks #{} - #{} to '{}'", start, end, peer_node_id ); let blocks = db .fetch_blocks(start..=end) @@ -198,7 +230,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ debug!( target: LOG_TARGET, - "Block sync round complete for peer `{}`.", session_token, + "Block sync round complete for peer `{}`.", peer_node_id, ); } .instrument(span), diff --git a/base_layer/core/src/base_node/sync/rpc/tests.rs b/base_layer/core/src/base_node/sync/rpc/tests.rs index 6f343aa4f1..288bfd1b9d 100644 --- a/base_layer/core/src/base_node/sync/rpc/tests.rs +++ b/base_layer/core/src/base_node/sync/rpc/tests.rs @@ -22,12 +22,14 @@ use futures::StreamExt; use tari_comms::protocol::rpc::{mock::RpcRequestMock, RpcStatusCode}; +use tari_service_framework::reply_channel; use tari_test_utils::{streams::convert_mpsc_to_stream, unpack_enum}; use tempfile::{tempdir, TempDir}; +use tokio::sync::broadcast; use super::BaseNodeSyncRpcService; use crate::{ - base_node::BaseNodeSyncService, + base_node::{BaseNodeSyncService, LocalNodeCommsInterface}, chain_storage::BlockchainDatabase, proto::base_node::{SyncBlocksRequest, SyncUtxosRequest}, test_helpers::{ @@ -47,7 +49,13 @@ fn setup() -> ( let request_mock = RpcRequestMock::new(peer_manager); let db = create_new_blockchain(); - let service = BaseNodeSyncRpcService::new(db.clone().into()); + let (req_tx, _) = reply_channel::unbounded(); + let (block_tx, _) = reply_channel::unbounded(); + let (block_event_tx, _) = broadcast::channel(1); + let service = BaseNodeSyncRpcService::new( + db.clone().into(), + LocalNodeCommsInterface::new(req_tx, block_tx, block_event_tx), + ); (service, db, request_mock, tmp) } diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index f98cbb3502..67ff5324b8 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -27,7 +27,7 @@ use std::{ convert::TryFrom, mem, ops::{Bound, RangeBounds}, - sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, + sync::{atomic, atomic::AtomicBool, Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, time::Instant, }; @@ -185,6 +185,7 @@ pub struct BlockchainDatabase { config: BlockchainDatabaseConfig, consensus_manager: ConsensusManager, difficulty_calculator: Arc, + disable_add_block_flag: Arc, } #[allow(clippy::ptr_arg)] @@ -208,6 +209,7 @@ where B: BlockchainBackend config, consensus_manager, difficulty_calculator: Arc::new(difficulty_calculator), + disable_add_block_flag: Arc::new(AtomicBool::new(false)), }; if is_empty { info!(target: LOG_TARGET, "Blockchain db is empty. Adding genesis block."); @@ -286,6 +288,18 @@ where B: BlockchainBackend }) } + pub(crate) fn is_add_block_disabled(&self) -> bool { + self.disable_add_block_flag.load(atomic::Ordering::Acquire) + } + + pub(crate) fn set_disable_add_block_flag(&self) { + self.disable_add_block_flag.store(true, atomic::Ordering::Release); + } + + pub(crate) fn clear_disable_add_block_flag(&self) { + self.disable_add_block_flag.store(false, atomic::Ordering::Release); + } + pub fn write(&self, transaction: DbTransaction) -> Result<(), ChainStorageError> { let mut db = self.db_write_access()?; db.write(transaction) @@ -806,6 +820,16 @@ where B: BlockchainBackend /// /// If an error does occur while writing the new block parts, all changes are reverted before returning. pub fn add_block(&self, block: Arc) -> Result { + if self.is_add_block_disabled() { + warn!( + target: LOG_TARGET, + "add_block is disabled. Ignoring candidate block #{} ({})", + block.header.height, + block.hash().to_hex() + ); + return Err(ChainStorageError::AddBlockOperationLocked); + } + let new_height = block.header.height; // Perform orphan block validation. if let Err(e) = self.validators.orphan.validate(&block) { @@ -2175,6 +2199,7 @@ impl Clone for BlockchainDatabase { config: self.config, consensus_manager: self.consensus_manager.clone(), difficulty_calculator: self.difficulty_calculator.clone(), + disable_add_block_flag: self.disable_add_block_flag.clone(), } } } diff --git a/base_layer/core/src/chain_storage/error.rs b/base_layer/core/src/chain_storage/error.rs index 96c207fc5c..b44c8032ae 100644 --- a/base_layer/core/src/chain_storage/error.rs +++ b/base_layer/core/src/chain_storage/error.rs @@ -117,6 +117,8 @@ pub enum ChainStorageError { DatabaseResyncRequired(&'static str), #[error("Block error: {0}")] BlockError(#[from] BlockError), + #[error("Add block is currently locked. No blocks may be added using add_block until the flag is cleared.")] + AddBlockOperationLocked, } impl ChainStorageError { diff --git a/base_layer/core/tests/base_node_rpc.rs b/base_layer/core/tests/base_node_rpc.rs index 0a3ee467aa..6d0f13de71 100644 --- a/base_layer/core/tests/base_node_rpc.rs +++ b/base_layer/core/tests/base_node_rpc.rs @@ -28,7 +28,7 @@ use tari_common::configuration::Network; use tari_comms::protocol::rpc::mock::RpcRequestMock; use tari_core::{ base_node::{ - comms_interface::Broadcast, + comms_interface::{Broadcast, LocalNodeCommsInterface}, proto::wallet_rpc::{ TxLocation, TxQueryBatchResponse, @@ -57,8 +57,10 @@ use tari_core::{ txn_schema, }; use tari_crypto::tari_utilities::epoch_time::EpochTime; +use tari_service_framework::reply_channel; use tari_test_utils::streams::convert_mpsc_to_stream; use tempfile::{tempdir, TempDir}; +use tokio::sync::broadcast; use crate::helpers::{ block_builders::{chain_block, chain_block_with_new_coinbase, create_genesis_block_with_coinbase_value}, @@ -105,7 +107,11 @@ async fn setup() -> ( base_node.mempool_handle.clone(), base_node.state_machine_handle.clone(), ); - let base_node_service = BaseNodeSyncRpcService::new(base_node.blockchain_db.clone().into()); + let (req_tx, _) = reply_channel::unbounded(); + let (block_tx, _) = reply_channel::unbounded(); + let (block_event_tx, _) = broadcast::channel(1); + let local_nci = LocalNodeCommsInterface::new(req_tx, block_tx, block_event_tx); + let base_node_service = BaseNodeSyncRpcService::new(base_node.blockchain_db.clone().into(), local_nci); ( wallet_service, base_node_service, diff --git a/comms/src/protocol/rpc/status.rs b/comms/src/protocol/rpc/status.rs index 986d628565..43504ca982 100644 --- a/comms/src/protocol/rpc/status.rs +++ b/comms/src/protocol/rpc/status.rs @@ -99,6 +99,13 @@ impl RpcStatus { } } + pub fn conflict(details: T) -> Self { + Self { + code: RpcStatusCode::Conflict, + details: details.to_string(), + } + } + /// Returns a closure that logs the given error and returns a generic general error that does not leak any /// potentially sensitive error information. Use this function with map_err to catch "miscellaneous" errors. pub fn log_internal_error<'a, E: std::error::Error + 'a>(target: &'a str) -> impl Fn(E) -> Self + 'a { @@ -197,6 +204,8 @@ pub enum RpcStatusCode { ProtocolError = 8, /// RPC forbidden error Forbidden = 9, + /// RPC conflict error + Conflict = 10, // The following status represents anything that is not recognised (i.e not one of the above codes). /// Unrecognised RPC status code InvalidRpcStatusCode, @@ -238,6 +247,7 @@ impl From for RpcStatusCode { 7 => NotFound, 8 => ProtocolError, 9 => Forbidden, + 10 => Conflict, _ => InvalidRpcStatusCode, } } @@ -261,6 +271,7 @@ mod test { assert_eq!(RpcStatusCode::from(InvalidRpcStatusCode as u32), InvalidRpcStatusCode); assert_eq!(RpcStatusCode::from(ProtocolError as u32), ProtocolError); assert_eq!(RpcStatusCode::from(Forbidden as u32), Forbidden); + assert_eq!(RpcStatusCode::from(Conflict as u32), Conflict); assert_eq!(RpcStatusCode::from(123), InvalidRpcStatusCode); } }