Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync status refactoring #5450

Merged
merged 8 commits into from
Aug 26, 2024
Merged
18 changes: 18 additions & 0 deletions prdoc/pr_5450.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
title: Sync status refactoring

doc:
- audience: Node Dev
description: |
`SyncingService` API in `sc-network-sync` has changed with some of the redundant methods related to sync status
removed that were mostly used internally or for testing purposes and is unlikely to impact external code.
`ExtendedPeerInfo` now has working `Clone` and `Copy` implementation.

crates:
- name: sc-informant
bump: major
- name: sc-network-sync
bump: major
- name: sc-network-test
bump: major
- name: sc-service
bump: major
2 changes: 1 addition & 1 deletion substrate/client/informant/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ impl<B: BlockT> InformantDisplay<B> {
info: &ClientInfo<B>,
net_status: NetworkStatus,
sync_status: SyncStatus<B>,
num_connected_peers: usize,
) {
let best_number = info.chain.best_number;
let best_hash = info.chain.best_hash;
let finalized_number = info.chain.finalized_number;
let num_connected_peers = sync_status.num_connected_peers;
let speed = speed::<B>(best_number, self.last_number, self.last_update);
let total_bytes_inbound = net_status.total_bytes_inbound;
let total_bytes_outbound = net_status.total_bytes_outbound;
Expand Down
14 changes: 7 additions & 7 deletions substrate/client/informant/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures_timer::Delay;
use log::{debug, info, trace};
use sc_client_api::{BlockchainEvents, UsageProvider};
use sc_network::NetworkStatusProvider;
use sc_network_sync::SyncStatusProvider;
use sc_network_sync::{SyncStatusProvider, SyncingService};
use sp_blockchain::HeaderMetadata;
use sp_runtime::traits::{Block as BlockT, Header};
use std::{collections::VecDeque, fmt::Display, sync::Arc, time::Duration};
Expand All @@ -37,10 +37,9 @@ fn interval(duration: Duration) -> impl Stream<Item = ()> + Unpin {
}

/// Builds the informant and returns a `Future` that drives the informant.
pub async fn build<B: BlockT, C, N, S>(client: Arc<C>, network: N, syncing: S)
pub async fn build<B: BlockT, C, N>(client: Arc<C>, network: N, syncing: Arc<SyncingService<B>>)
where
N: NetworkStatusProvider,
S: SyncStatusProvider<B>,
C: UsageProvider<B> + HeaderMetadata<B> + BlockchainEvents<B>,
<C as HeaderMetadata<B>>::Error: Display,
{
Expand All @@ -52,13 +51,14 @@ where
.filter_map(|_| async {
let net_status = network.status().await;
let sync_status = syncing.status().await;
let num_connected_peers = syncing.num_connected_peers();
dmitry-markin marked this conversation as resolved.
Show resolved Hide resolved

match (net_status.ok(), sync_status.ok()) {
(Some(net), Some(sync)) => Some((net, sync)),
match (net_status, sync_status) {
(Ok(net), Ok(sync)) => Some((net, sync, num_connected_peers)),
_ => None,
}
})
.for_each(move |(net_status, sync_status)| {
.for_each(move |(net_status, sync_status, num_connected_peers)| {
let info = client_1.usage_info();
if let Some(ref usage) = info.usage {
trace!(target: "usage", "Usage statistics: {}", usage);
Expand All @@ -68,7 +68,7 @@ where
"Usage statistics not displayed as backend does not provide it",
)
}
display.display(&info, net_status, sync_status);
display.display(&info, net_status, sync_status, num_connected_peers);
future::ready(())
});

Expand Down
26 changes: 5 additions & 21 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,6 @@ where
}

// Update atomic variables
self.num_connected.store(self.peers.len(), Ordering::Relaxed);
self.is_major_syncing.store(self.strategy.is_major_syncing(), Ordering::Relaxed);

// Process actions requested by a syncing strategy.
Expand Down Expand Up @@ -761,37 +760,20 @@ where
);
},
ToServiceCommand::Status(tx) => {
let mut status = self.strategy.status();
status.num_connected_peers = self.peers.len() as u32;
let _ = tx.send(status);
let _ = tx.send(self.strategy.status());
},
ToServiceCommand::NumActivePeers(tx) => {
let _ = tx.send(self.num_active_peers());
},
ToServiceCommand::SyncState(tx) => {
let _ = tx.send(self.strategy.status());
},
ToServiceCommand::BestSeenBlock(tx) => {
let _ = tx.send(self.strategy.status().best_seen_block);
},
ToServiceCommand::NumSyncPeers(tx) => {
let _ = tx.send(self.strategy.status().num_peers);
},
ToServiceCommand::NumQueuedBlocks(tx) => {
let _ = tx.send(self.strategy.status().queued_blocks);
},
ToServiceCommand::NumDownloadedBlocks(tx) => {
let _ = tx.send(self.strategy.num_downloaded_blocks());
},
ToServiceCommand::NumSyncRequests(tx) => {
let _ = tx.send(self.strategy.num_sync_requests());
},
ToServiceCommand::PeersInfo(tx) => {
let peers_info = self
.peers
.iter()
.map(|(peer_id, peer)| (*peer_id, peer.info.clone()))
.collect();
let peers_info =
self.peers.iter().map(|(peer_id, peer)| (*peer_id, peer.info)).collect();
let _ = tx.send(peers_info);
},
ToServiceCommand::OnBlockFinalized(hash, header) =>
Expand Down Expand Up @@ -867,6 +849,7 @@ where
if let Some(metrics) = &self.metrics {
metrics.peers.dec();
}
self.num_connected.fetch_sub(1, Ordering::AcqRel);

if self.important_peers.contains(&peer_id) {
log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
Expand Down Expand Up @@ -1046,6 +1029,7 @@ where
if let Some(metrics) = &self.metrics {
metrics.peers.inc();
}
self.num_connected.fetch_add(1, Ordering::AcqRel);
}
self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());

Expand Down
37 changes: 7 additions & 30 deletions substrate/client/network/sync/src/service/syncing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ pub enum ToServiceCommand<B: BlockT> {
EventStream(TracingUnboundedSender<SyncEvent>),
Status(oneshot::Sender<SyncStatus<B>>),
NumActivePeers(oneshot::Sender<usize>),
SyncState(oneshot::Sender<SyncStatus<B>>),
BestSeenBlock(oneshot::Sender<Option<NumberFor<B>>>),
NumSyncPeers(oneshot::Sender<u32>),
NumQueuedBlocks(oneshot::Sender<u32>),
NumDownloadedBlocks(oneshot::Sender<usize>),
NumSyncRequests(oneshot::Sender<usize>),
PeersInfo(oneshot::Sender<Vec<(PeerId, ExtendedPeerInfo<B>)>>),
Expand Down Expand Up @@ -83,6 +79,11 @@ impl<B: BlockT> SyncingService<B> {
Self { tx, num_connected, is_major_syncing }
}

/// Get the number of peers known to `SyncingEngine` (both full and light).
pub fn num_connected_peers(&self) -> usize {
self.num_connected.load(Ordering::Relaxed)
}

/// Get the number of active peers.
pub async fn num_active_peers(&self) -> Result<usize, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
Expand All @@ -91,30 +92,6 @@ impl<B: BlockT> SyncingService<B> {
rx.await
}

/// Get best seen block.
pub async fn best_seen_block(&self) -> Result<Option<NumberFor<B>>, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::BestSeenBlock(tx));

rx.await
}

/// Get the number of sync peers.
pub async fn num_sync_peers(&self) -> Result<u32, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::NumSyncPeers(tx));

rx.await
}

/// Get the number of queued blocks.
pub async fn num_queued_blocks(&self) -> Result<u32, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::NumQueuedBlocks(tx));

rx.await
}

/// Get the number of downloaded blocks.
pub async fn num_downloaded_blocks(&self) -> Result<usize, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -149,11 +126,11 @@ impl<B: BlockT> SyncingService<B> {
/// Get sync status
///
/// Returns an error if `SyncingEngine` has terminated.
pub async fn status(&self) -> Result<SyncStatus<B>, ()> {
pub async fn status(&self) -> Result<SyncStatus<B>, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx));

rx.await.map_err(|_| ())
rx.await
}
}

Expand Down
1 change: 0 additions & 1 deletion substrate/client/network/sync/src/strategy/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ where
state: sync_state,
best_seen_block,
num_peers: self.peers.len() as u32,
num_connected_peers: 0u32,
queued_blocks: self.queue_blocks.len() as u32,
state_sync: self.state_sync.as_ref().map(|s| s.progress()),
warp_sync: warp_sync_progress,
Expand Down
1 change: 0 additions & 1 deletion substrate/client/network/sync/src/strategy/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ impl<B: BlockT> StateStrategy<B> {
},
best_seen_block: Some(self.state_sync.target_number()),
num_peers: self.peers.len().saturated_into(),
num_connected_peers: self.peers.len().saturated_into(),
queued_blocks: 0,
state_sync: Some(self.state_sync.progress()),
warp_sync: None,
Expand Down
1 change: 0 additions & 1 deletion substrate/client/network/sync/src/strategy/warp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,6 @@ where
Phase::Complete => None,
},
num_peers: self.peers.len().saturated_into(),
num_connected_peers: self.peers.len().saturated_into(),
queued_blocks: 0,
state_sync: None,
warp_sync: Some(self.progress()),
Expand Down
15 changes: 12 additions & 3 deletions substrate/client/network/sync/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct PeerInfo<Block: BlockT> {
}

/// Info about a peer's known state (both full and light).
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct ExtendedPeerInfo<B: BlockT> {
/// Roles
pub roles: Roles,
Expand All @@ -49,6 +49,17 @@ pub struct ExtendedPeerInfo<B: BlockT> {
pub best_number: NumberFor<B>,
}

impl<B> Clone for ExtendedPeerInfo<B>
where
B: BlockT,
{
fn clone(&self) -> Self {
Self { roles: self.roles, best_hash: self.best_hash, best_number: self.best_number }
}
}

impl<B> Copy for ExtendedPeerInfo<B> where B: BlockT {}

/// Reported sync state.
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum SyncState<BlockNumber> {
Expand Down Expand Up @@ -76,8 +87,6 @@ pub struct SyncStatus<Block: BlockT> {
pub best_seen_block: Option<NumberFor<Block>>,
/// Number of peers participating in syncing.
pub num_peers: u32,
/// Number of peers known to `SyncingEngine` (both full and light).
pub num_connected_peers: u32,
/// Number of blocks queued for import
pub queued_blocks: u32,
/// State sync status in progress, if any.
Expand Down
10 changes: 4 additions & 6 deletions substrate/client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ where

/// Returns the number of peers we're connected to.
pub async fn num_peers(&self) -> usize {
self.sync_service.status().await.unwrap().num_connected_peers as usize
self.sync_service.num_connected_peers()
}

/// Returns the number of downloaded blocks.
Expand Down Expand Up @@ -1016,7 +1016,7 @@ pub trait TestNetFactory: Default + Sized + Send {

for peer in peers {
if peer.sync_service.is_major_syncing() ||
peer.sync_service.num_queued_blocks().await.unwrap() != 0
peer.sync_service.status().await.unwrap().queued_blocks != 0
{
return false
}
Expand All @@ -1036,7 +1036,7 @@ pub trait TestNetFactory: Default + Sized + Send {
async fn is_idle(&mut self) -> bool {
let peers = self.peers_mut();
for peer in peers {
if peer.sync_service.num_queued_blocks().await.unwrap() != 0 {
if peer.sync_service.status().await.unwrap().queued_blocks != 0 {
return false
}
if peer.sync_service.num_sync_requests().await.unwrap() != 0 {
Expand Down Expand Up @@ -1094,9 +1094,7 @@ pub trait TestNetFactory: Default + Sized + Send {

'outer: loop {
for sync_service in &sync_services {
if sync_service.status().await.unwrap().num_connected_peers as usize !=
num_peers - 1
{
if sync_service.num_connected_peers() != num_peers - 1 {
futures::future::poll_fn::<(), _>(|cx| {
self.poll(cx);
Poll::Ready(())
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/test/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ async fn syncs_all_forks_from_single_peer() {
})
.await;

if net.peer(1).sync_service().best_seen_block().await.unwrap() == Some(12) {
if net.peer(1).sync_service().status().await.unwrap().best_seen_block == Some(12) {
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ pub async fn build_system_rpc_future<
sc_rpc::system::Request::SyncState(sender) => {
use sc_rpc::system::SyncState;

match sync_service.best_seen_block().await {
match sync_service.status().await.map(|status| status.best_seen_block) {
Ok(best_seen_block) => {
let best_number = client.info().best_number;
let _ = sender.send(SyncState {
Expand Down
Loading