diff --git a/prdoc/pr_5450.prdoc b/prdoc/pr_5450.prdoc new file mode 100644 index 000000000000..ccd319cad246 --- /dev/null +++ b/prdoc/pr_5450.prdoc @@ -0,0 +1,18 @@ +title: Sync status refactoring + +doc: + - audience: Node Dev + description: | + `SyncingService` API in `sc-network-sync` has changed with some of the redundant methods related to sync status + removed that were mostly used internally or for testing purposes and is unlikely to impact external code. + `ExtendedPeerInfo` now has working `Clone` and `Copy` implementation. + +crates: + - name: sc-informant + bump: major + - name: sc-network-sync + bump: major + - name: sc-network-test + bump: major + - name: sc-service + bump: major diff --git a/substrate/client/informant/src/display.rs b/substrate/client/informant/src/display.rs index f5e042103ea7..2decd7674782 100644 --- a/substrate/client/informant/src/display.rs +++ b/substrate/client/informant/src/display.rs @@ -65,11 +65,11 @@ impl InformantDisplay { info: &ClientInfo, net_status: NetworkStatus, sync_status: SyncStatus, + num_connected_peers: usize, ) { let best_number = info.chain.best_number; let best_hash = info.chain.best_hash; let finalized_number = info.chain.finalized_number; - let num_connected_peers = sync_status.num_connected_peers; let speed = speed::(best_number, self.last_number, self.last_update); let total_bytes_inbound = net_status.total_bytes_inbound; let total_bytes_outbound = net_status.total_bytes_outbound; diff --git a/substrate/client/informant/src/lib.rs b/substrate/client/informant/src/lib.rs index d44364539a29..0b0e13dc08bb 100644 --- a/substrate/client/informant/src/lib.rs +++ b/substrate/client/informant/src/lib.rs @@ -24,7 +24,7 @@ use futures_timer::Delay; use log::{debug, info, trace}; use sc_client_api::{BlockchainEvents, UsageProvider}; use sc_network::NetworkStatusProvider; -use sc_network_sync::SyncStatusProvider; +use sc_network_sync::{SyncStatusProvider, SyncingService}; use sp_blockchain::HeaderMetadata; use sp_runtime::traits::{Block as BlockT, Header}; use std::{collections::VecDeque, fmt::Display, sync::Arc, time::Duration}; @@ -37,10 +37,9 @@ fn interval(duration: Duration) -> impl Stream + Unpin { } /// Builds the informant and returns a `Future` that drives the informant. -pub async fn build(client: Arc, network: N, syncing: S) +pub async fn build(client: Arc, network: N, syncing: Arc>) where N: NetworkStatusProvider, - S: SyncStatusProvider, C: UsageProvider + HeaderMetadata + BlockchainEvents, >::Error: Display, { @@ -52,13 +51,14 @@ where .filter_map(|_| async { let net_status = network.status().await; let sync_status = syncing.status().await; + let num_connected_peers = syncing.num_connected_peers(); - match (net_status.ok(), sync_status.ok()) { - (Some(net), Some(sync)) => Some((net, sync)), + match (net_status, sync_status) { + (Ok(net), Ok(sync)) => Some((net, sync, num_connected_peers)), _ => None, } }) - .for_each(move |(net_status, sync_status)| { + .for_each(move |(net_status, sync_status, num_connected_peers)| { let info = client_1.usage_info(); if let Some(ref usage) = info.usage { trace!(target: "usage", "Usage statistics: {}", usage); @@ -68,7 +68,7 @@ where "Usage statistics not displayed as backend does not provide it", ) } - display.display(&info, net_status, sync_status); + display.display(&info, net_status, sync_status, num_connected_peers); future::ready(()) }); diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 616dd3168872..4a57d61df457 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -615,7 +615,6 @@ where } // Update atomic variables - self.num_connected.store(self.peers.len(), Ordering::Relaxed); self.is_major_syncing.store(self.strategy.is_major_syncing(), Ordering::Relaxed); // Process actions requested by a syncing strategy. @@ -761,25 +760,11 @@ where ); }, ToServiceCommand::Status(tx) => { - let mut status = self.strategy.status(); - status.num_connected_peers = self.peers.len() as u32; - let _ = tx.send(status); + let _ = tx.send(self.strategy.status()); }, ToServiceCommand::NumActivePeers(tx) => { let _ = tx.send(self.num_active_peers()); }, - ToServiceCommand::SyncState(tx) => { - let _ = tx.send(self.strategy.status()); - }, - ToServiceCommand::BestSeenBlock(tx) => { - let _ = tx.send(self.strategy.status().best_seen_block); - }, - ToServiceCommand::NumSyncPeers(tx) => { - let _ = tx.send(self.strategy.status().num_peers); - }, - ToServiceCommand::NumQueuedBlocks(tx) => { - let _ = tx.send(self.strategy.status().queued_blocks); - }, ToServiceCommand::NumDownloadedBlocks(tx) => { let _ = tx.send(self.strategy.num_downloaded_blocks()); }, @@ -787,11 +772,8 @@ where let _ = tx.send(self.strategy.num_sync_requests()); }, ToServiceCommand::PeersInfo(tx) => { - let peers_info = self - .peers - .iter() - .map(|(peer_id, peer)| (*peer_id, peer.info.clone())) - .collect(); + let peers_info = + self.peers.iter().map(|(peer_id, peer)| (*peer_id, peer.info)).collect(); let _ = tx.send(peers_info); }, ToServiceCommand::OnBlockFinalized(hash, header) => @@ -867,6 +849,7 @@ where if let Some(metrics) = &self.metrics { metrics.peers.dec(); } + self.num_connected.fetch_sub(1, Ordering::AcqRel); if self.important_peers.contains(&peer_id) { log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected"); @@ -1046,6 +1029,7 @@ where if let Some(metrics) = &self.metrics { metrics.peers.inc(); } + self.num_connected.fetch_add(1, Ordering::AcqRel); } self.peer_store_handle.set_peer_role(&peer_id, status.roles.into()); diff --git a/substrate/client/network/sync/src/service/syncing_service.rs b/substrate/client/network/sync/src/service/syncing_service.rs index f4bc58afd4fd..08a2b36118a9 100644 --- a/substrate/client/network/sync/src/service/syncing_service.rs +++ b/substrate/client/network/sync/src/service/syncing_service.rs @@ -50,10 +50,6 @@ pub enum ToServiceCommand { EventStream(TracingUnboundedSender), Status(oneshot::Sender>), NumActivePeers(oneshot::Sender), - SyncState(oneshot::Sender>), - BestSeenBlock(oneshot::Sender>>), - NumSyncPeers(oneshot::Sender), - NumQueuedBlocks(oneshot::Sender), NumDownloadedBlocks(oneshot::Sender), NumSyncRequests(oneshot::Sender), PeersInfo(oneshot::Sender)>>), @@ -83,6 +79,11 @@ impl SyncingService { Self { tx, num_connected, is_major_syncing } } + /// Get the number of peers known to `SyncingEngine` (both full and light). + pub fn num_connected_peers(&self) -> usize { + self.num_connected.load(Ordering::Relaxed) + } + /// Get the number of active peers. pub async fn num_active_peers(&self) -> Result { let (tx, rx) = oneshot::channel(); @@ -91,30 +92,6 @@ impl SyncingService { rx.await } - /// Get best seen block. - pub async fn best_seen_block(&self) -> Result>, oneshot::Canceled> { - let (tx, rx) = oneshot::channel(); - let _ = self.tx.unbounded_send(ToServiceCommand::BestSeenBlock(tx)); - - rx.await - } - - /// Get the number of sync peers. - pub async fn num_sync_peers(&self) -> Result { - let (tx, rx) = oneshot::channel(); - let _ = self.tx.unbounded_send(ToServiceCommand::NumSyncPeers(tx)); - - rx.await - } - - /// Get the number of queued blocks. - pub async fn num_queued_blocks(&self) -> Result { - let (tx, rx) = oneshot::channel(); - let _ = self.tx.unbounded_send(ToServiceCommand::NumQueuedBlocks(tx)); - - rx.await - } - /// Get the number of downloaded blocks. pub async fn num_downloaded_blocks(&self) -> Result { let (tx, rx) = oneshot::channel(); @@ -149,11 +126,11 @@ impl SyncingService { /// Get sync status /// /// Returns an error if `SyncingEngine` has terminated. - pub async fn status(&self) -> Result, ()> { + pub async fn status(&self) -> Result, oneshot::Canceled> { let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx)); - rx.await.map_err(|_| ()) + rx.await } } diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index 49e97ab11c37..21e474048625 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -438,7 +438,6 @@ where state: sync_state, best_seen_block, num_peers: self.peers.len() as u32, - num_connected_peers: 0u32, queued_blocks: self.queue_blocks.len() as u32, state_sync: self.state_sync.as_ref().map(|s| s.progress()), warp_sync: warp_sync_progress, diff --git a/substrate/client/network/sync/src/strategy/state.rs b/substrate/client/network/sync/src/strategy/state.rs index ff229863a68b..6f06f238fe3a 100644 --- a/substrate/client/network/sync/src/strategy/state.rs +++ b/substrate/client/network/sync/src/strategy/state.rs @@ -340,7 +340,6 @@ impl StateStrategy { }, best_seen_block: Some(self.state_sync.target_number()), num_peers: self.peers.len().saturated_into(), - num_connected_peers: self.peers.len().saturated_into(), queued_blocks: 0, state_sync: Some(self.state_sync.progress()), warp_sync: None, diff --git a/substrate/client/network/sync/src/strategy/warp.rs b/substrate/client/network/sync/src/strategy/warp.rs index 4cc3e9f3a68b..99405c2e5f08 100644 --- a/substrate/client/network/sync/src/strategy/warp.rs +++ b/substrate/client/network/sync/src/strategy/warp.rs @@ -576,7 +576,6 @@ where Phase::Complete => None, }, num_peers: self.peers.len().saturated_into(), - num_connected_peers: self.peers.len().saturated_into(), queued_blocks: 0, state_sync: None, warp_sync: Some(self.progress()), diff --git a/substrate/client/network/sync/src/types.rs b/substrate/client/network/sync/src/types.rs index e8b8c8900360..c3403fe1e5f7 100644 --- a/substrate/client/network/sync/src/types.rs +++ b/substrate/client/network/sync/src/types.rs @@ -39,7 +39,7 @@ pub struct PeerInfo { } /// Info about a peer's known state (both full and light). -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct ExtendedPeerInfo { /// Roles pub roles: Roles, @@ -49,6 +49,17 @@ pub struct ExtendedPeerInfo { pub best_number: NumberFor, } +impl Clone for ExtendedPeerInfo +where + B: BlockT, +{ + fn clone(&self) -> Self { + Self { roles: self.roles, best_hash: self.best_hash, best_number: self.best_number } + } +} + +impl Copy for ExtendedPeerInfo where B: BlockT {} + /// Reported sync state. #[derive(Clone, Eq, PartialEq, Debug)] pub enum SyncState { @@ -76,8 +87,6 @@ pub struct SyncStatus { pub best_seen_block: Option>, /// Number of peers participating in syncing. pub num_peers: u32, - /// Number of peers known to `SyncingEngine` (both full and light). - pub num_connected_peers: u32, /// Number of blocks queued for import pub queued_blocks: u32, /// State sync status in progress, if any. diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index b5aeb162e9fa..f84f353fb4a0 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -266,7 +266,7 @@ where /// Returns the number of peers we're connected to. pub async fn num_peers(&self) -> usize { - self.sync_service.status().await.unwrap().num_connected_peers as usize + self.sync_service.num_connected_peers() } /// Returns the number of downloaded blocks. @@ -1016,7 +1016,7 @@ pub trait TestNetFactory: Default + Sized + Send { for peer in peers { if peer.sync_service.is_major_syncing() || - peer.sync_service.num_queued_blocks().await.unwrap() != 0 + peer.sync_service.status().await.unwrap().queued_blocks != 0 { return false } @@ -1036,7 +1036,7 @@ pub trait TestNetFactory: Default + Sized + Send { async fn is_idle(&mut self) -> bool { let peers = self.peers_mut(); for peer in peers { - if peer.sync_service.num_queued_blocks().await.unwrap() != 0 { + if peer.sync_service.status().await.unwrap().queued_blocks != 0 { return false } if peer.sync_service.num_sync_requests().await.unwrap() != 0 { @@ -1094,9 +1094,7 @@ pub trait TestNetFactory: Default + Sized + Send { 'outer: loop { for sync_service in &sync_services { - if sync_service.status().await.unwrap().num_connected_peers as usize != - num_peers - 1 - { + if sync_service.num_connected_peers() != num_peers - 1 { futures::future::poll_fn::<(), _>(|cx| { self.poll(cx); Poll::Ready(()) diff --git a/substrate/client/network/test/src/sync.rs b/substrate/client/network/test/src/sync.rs index 9edd68c2ed9f..4244c49bf7fb 100644 --- a/substrate/client/network/test/src/sync.rs +++ b/substrate/client/network/test/src/sync.rs @@ -1049,7 +1049,7 @@ async fn syncs_all_forks_from_single_peer() { }) .await; - if net.peer(1).sync_service().best_seen_block().await.unwrap() == Some(12) { + if net.peer(1).sync_service().status().await.unwrap().best_seen_block == Some(12) { break } } diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index ed108a3102bc..5fb304edd7e1 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -341,7 +341,7 @@ pub async fn build_system_rpc_future< sc_rpc::system::Request::SyncState(sender) => { use sc_rpc::system::SyncState; - match sync_service.best_seen_block().await { + match sync_service.status().await.map(|status| status.best_seen_block) { Ok(best_seen_block) => { let best_number = client.info().best_number; let _ = sender.send(SyncState {