Skip to content

Commit

Permalink
Sync status refactoring (#5450)
Browse files Browse the repository at this point in the history
As I was looking at the coupling between `SyncingEngine`,
`SyncingStrategy` and individual strategies I noticed a few things that
were unused, redundant or awkward.

The awkward change comes from
paritytech/substrate#13700 where
`num_connected_peers` property was added to `SyncStatus` struct just so
it can be rendered in the informer. While convenient, the property
didn't really belong there and was annoyingly set to `0` in some
strategies and to `num_peers` in others. I have replaced that with a
property on `SyncingService` that already stored necessary information
internally.

Also `ExtendedPeerInfo` didn't have a working `Clone` implementation due
to lack of perfect derive in Rust and while I ended up not using it in
the refactoring, I included fixed implementation for it in this PR
anyway.

While these changes are not strictly necessary for
#5333, they do reduce
coupling of syncing engine with syncing strategy, which I thought is a
good thing.

Reviewing individual commits will be the easiest as usual.

---------

Co-authored-by: Dmitry Markin <[email protected]>
  • Loading branch information
nazar-pc and dmitry-markin authored Aug 26, 2024
1 parent 3cbefaf commit dd1aaa4
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 73 deletions.
18 changes: 18 additions & 0 deletions prdoc/pr_5450.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
title: Sync status refactoring

doc:
- audience: Node Dev
description: |
`SyncingService` API in `sc-network-sync` has changed with some of the redundant methods related to sync status
removed that were mostly used internally or for testing purposes and is unlikely to impact external code.
`ExtendedPeerInfo` now has working `Clone` and `Copy` implementation.

crates:
- name: sc-informant
bump: major
- name: sc-network-sync
bump: major
- name: sc-network-test
bump: major
- name: sc-service
bump: major
2 changes: 1 addition & 1 deletion substrate/client/informant/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ impl<B: BlockT> InformantDisplay<B> {
info: &ClientInfo<B>,
net_status: NetworkStatus,
sync_status: SyncStatus<B>,
num_connected_peers: usize,
) {
let best_number = info.chain.best_number;
let best_hash = info.chain.best_hash;
let finalized_number = info.chain.finalized_number;
let num_connected_peers = sync_status.num_connected_peers;
let speed = speed::<B>(best_number, self.last_number, self.last_update);
let total_bytes_inbound = net_status.total_bytes_inbound;
let total_bytes_outbound = net_status.total_bytes_outbound;
Expand Down
14 changes: 7 additions & 7 deletions substrate/client/informant/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures_timer::Delay;
use log::{debug, info, trace};
use sc_client_api::{BlockchainEvents, UsageProvider};
use sc_network::NetworkStatusProvider;
use sc_network_sync::SyncStatusProvider;
use sc_network_sync::{SyncStatusProvider, SyncingService};
use sp_blockchain::HeaderMetadata;
use sp_runtime::traits::{Block as BlockT, Header};
use std::{collections::VecDeque, fmt::Display, sync::Arc, time::Duration};
Expand All @@ -37,10 +37,9 @@ fn interval(duration: Duration) -> impl Stream<Item = ()> + Unpin {
}

/// Builds the informant and returns a `Future` that drives the informant.
pub async fn build<B: BlockT, C, N, S>(client: Arc<C>, network: N, syncing: S)
pub async fn build<B: BlockT, C, N>(client: Arc<C>, network: N, syncing: Arc<SyncingService<B>>)
where
N: NetworkStatusProvider,
S: SyncStatusProvider<B>,
C: UsageProvider<B> + HeaderMetadata<B> + BlockchainEvents<B>,
<C as HeaderMetadata<B>>::Error: Display,
{
Expand All @@ -52,13 +51,14 @@ where
.filter_map(|_| async {
let net_status = network.status().await;
let sync_status = syncing.status().await;
let num_connected_peers = syncing.num_connected_peers();

match (net_status.ok(), sync_status.ok()) {
(Some(net), Some(sync)) => Some((net, sync)),
match (net_status, sync_status) {
(Ok(net), Ok(sync)) => Some((net, sync, num_connected_peers)),
_ => None,
}
})
.for_each(move |(net_status, sync_status)| {
.for_each(move |(net_status, sync_status, num_connected_peers)| {
let info = client_1.usage_info();
if let Some(ref usage) = info.usage {
trace!(target: "usage", "Usage statistics: {}", usage);
Expand All @@ -68,7 +68,7 @@ where
"Usage statistics not displayed as backend does not provide it",
)
}
display.display(&info, net_status, sync_status);
display.display(&info, net_status, sync_status, num_connected_peers);
future::ready(())
});

Expand Down
26 changes: 5 additions & 21 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,6 @@ where
}

// Update atomic variables
self.num_connected.store(self.peers.len(), Ordering::Relaxed);
self.is_major_syncing.store(self.strategy.is_major_syncing(), Ordering::Relaxed);

// Process actions requested by a syncing strategy.
Expand Down Expand Up @@ -761,37 +760,20 @@ where
);
},
ToServiceCommand::Status(tx) => {
let mut status = self.strategy.status();
status.num_connected_peers = self.peers.len() as u32;
let _ = tx.send(status);
let _ = tx.send(self.strategy.status());
},
ToServiceCommand::NumActivePeers(tx) => {
let _ = tx.send(self.num_active_peers());
},
ToServiceCommand::SyncState(tx) => {
let _ = tx.send(self.strategy.status());
},
ToServiceCommand::BestSeenBlock(tx) => {
let _ = tx.send(self.strategy.status().best_seen_block);
},
ToServiceCommand::NumSyncPeers(tx) => {
let _ = tx.send(self.strategy.status().num_peers);
},
ToServiceCommand::NumQueuedBlocks(tx) => {
let _ = tx.send(self.strategy.status().queued_blocks);
},
ToServiceCommand::NumDownloadedBlocks(tx) => {
let _ = tx.send(self.strategy.num_downloaded_blocks());
},
ToServiceCommand::NumSyncRequests(tx) => {
let _ = tx.send(self.strategy.num_sync_requests());
},
ToServiceCommand::PeersInfo(tx) => {
let peers_info = self
.peers
.iter()
.map(|(peer_id, peer)| (*peer_id, peer.info.clone()))
.collect();
let peers_info =
self.peers.iter().map(|(peer_id, peer)| (*peer_id, peer.info)).collect();
let _ = tx.send(peers_info);
},
ToServiceCommand::OnBlockFinalized(hash, header) =>
Expand Down Expand Up @@ -867,6 +849,7 @@ where
if let Some(metrics) = &self.metrics {
metrics.peers.dec();
}
self.num_connected.fetch_sub(1, Ordering::AcqRel);

if self.important_peers.contains(&peer_id) {
log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
Expand Down Expand Up @@ -1046,6 +1029,7 @@ where
if let Some(metrics) = &self.metrics {
metrics.peers.inc();
}
self.num_connected.fetch_add(1, Ordering::AcqRel);
}
self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());

Expand Down
37 changes: 7 additions & 30 deletions substrate/client/network/sync/src/service/syncing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ pub enum ToServiceCommand<B: BlockT> {
EventStream(TracingUnboundedSender<SyncEvent>),
Status(oneshot::Sender<SyncStatus<B>>),
NumActivePeers(oneshot::Sender<usize>),
SyncState(oneshot::Sender<SyncStatus<B>>),
BestSeenBlock(oneshot::Sender<Option<NumberFor<B>>>),
NumSyncPeers(oneshot::Sender<u32>),
NumQueuedBlocks(oneshot::Sender<u32>),
NumDownloadedBlocks(oneshot::Sender<usize>),
NumSyncRequests(oneshot::Sender<usize>),
PeersInfo(oneshot::Sender<Vec<(PeerId, ExtendedPeerInfo<B>)>>),
Expand Down Expand Up @@ -83,6 +79,11 @@ impl<B: BlockT> SyncingService<B> {
Self { tx, num_connected, is_major_syncing }
}

/// Get the number of peers known to `SyncingEngine` (both full and light).
pub fn num_connected_peers(&self) -> usize {
self.num_connected.load(Ordering::Relaxed)
}

/// Get the number of active peers.
pub async fn num_active_peers(&self) -> Result<usize, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
Expand All @@ -91,30 +92,6 @@ impl<B: BlockT> SyncingService<B> {
rx.await
}

/// Get best seen block.
pub async fn best_seen_block(&self) -> Result<Option<NumberFor<B>>, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::BestSeenBlock(tx));

rx.await
}

/// Get the number of sync peers.
pub async fn num_sync_peers(&self) -> Result<u32, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::NumSyncPeers(tx));

rx.await
}

/// Get the number of queued blocks.
pub async fn num_queued_blocks(&self) -> Result<u32, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::NumQueuedBlocks(tx));

rx.await
}

/// Get the number of downloaded blocks.
pub async fn num_downloaded_blocks(&self) -> Result<usize, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -149,11 +126,11 @@ impl<B: BlockT> SyncingService<B> {
/// Get sync status
///
/// Returns an error if `SyncingEngine` has terminated.
pub async fn status(&self) -> Result<SyncStatus<B>, ()> {
pub async fn status(&self) -> Result<SyncStatus<B>, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx));

rx.await.map_err(|_| ())
rx.await
}
}

Expand Down
1 change: 0 additions & 1 deletion substrate/client/network/sync/src/strategy/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ where
state: sync_state,
best_seen_block,
num_peers: self.peers.len() as u32,
num_connected_peers: 0u32,
queued_blocks: self.queue_blocks.len() as u32,
state_sync: self.state_sync.as_ref().map(|s| s.progress()),
warp_sync: warp_sync_progress,
Expand Down
1 change: 0 additions & 1 deletion substrate/client/network/sync/src/strategy/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ impl<B: BlockT> StateStrategy<B> {
},
best_seen_block: Some(self.state_sync.target_number()),
num_peers: self.peers.len().saturated_into(),
num_connected_peers: self.peers.len().saturated_into(),
queued_blocks: 0,
state_sync: Some(self.state_sync.progress()),
warp_sync: None,
Expand Down
1 change: 0 additions & 1 deletion substrate/client/network/sync/src/strategy/warp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,6 @@ where
Phase::Complete => None,
},
num_peers: self.peers.len().saturated_into(),
num_connected_peers: self.peers.len().saturated_into(),
queued_blocks: 0,
state_sync: None,
warp_sync: Some(self.progress()),
Expand Down
15 changes: 12 additions & 3 deletions substrate/client/network/sync/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct PeerInfo<Block: BlockT> {
}

/// Info about a peer's known state (both full and light).
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct ExtendedPeerInfo<B: BlockT> {
/// Roles
pub roles: Roles,
Expand All @@ -49,6 +49,17 @@ pub struct ExtendedPeerInfo<B: BlockT> {
pub best_number: NumberFor<B>,
}

impl<B> Clone for ExtendedPeerInfo<B>
where
B: BlockT,
{
fn clone(&self) -> Self {
Self { roles: self.roles, best_hash: self.best_hash, best_number: self.best_number }
}
}

impl<B> Copy for ExtendedPeerInfo<B> where B: BlockT {}

/// Reported sync state.
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum SyncState<BlockNumber> {
Expand Down Expand Up @@ -76,8 +87,6 @@ pub struct SyncStatus<Block: BlockT> {
pub best_seen_block: Option<NumberFor<Block>>,
/// Number of peers participating in syncing.
pub num_peers: u32,
/// Number of peers known to `SyncingEngine` (both full and light).
pub num_connected_peers: u32,
/// Number of blocks queued for import
pub queued_blocks: u32,
/// State sync status in progress, if any.
Expand Down
10 changes: 4 additions & 6 deletions substrate/client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ where

/// Returns the number of peers we're connected to.
pub async fn num_peers(&self) -> usize {
self.sync_service.status().await.unwrap().num_connected_peers as usize
self.sync_service.num_connected_peers()
}

/// Returns the number of downloaded blocks.
Expand Down Expand Up @@ -1016,7 +1016,7 @@ pub trait TestNetFactory: Default + Sized + Send {

for peer in peers {
if peer.sync_service.is_major_syncing() ||
peer.sync_service.num_queued_blocks().await.unwrap() != 0
peer.sync_service.status().await.unwrap().queued_blocks != 0
{
return false
}
Expand All @@ -1036,7 +1036,7 @@ pub trait TestNetFactory: Default + Sized + Send {
async fn is_idle(&mut self) -> bool {
let peers = self.peers_mut();
for peer in peers {
if peer.sync_service.num_queued_blocks().await.unwrap() != 0 {
if peer.sync_service.status().await.unwrap().queued_blocks != 0 {
return false
}
if peer.sync_service.num_sync_requests().await.unwrap() != 0 {
Expand Down Expand Up @@ -1094,9 +1094,7 @@ pub trait TestNetFactory: Default + Sized + Send {

'outer: loop {
for sync_service in &sync_services {
if sync_service.status().await.unwrap().num_connected_peers as usize !=
num_peers - 1
{
if sync_service.num_connected_peers() != num_peers - 1 {
futures::future::poll_fn::<(), _>(|cx| {
self.poll(cx);
Poll::Ready(())
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/test/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ async fn syncs_all_forks_from_single_peer() {
})
.await;

if net.peer(1).sync_service().best_seen_block().await.unwrap() == Some(12) {
if net.peer(1).sync_service().status().await.unwrap().best_seen_block == Some(12) {
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ pub async fn build_system_rpc_future<
sc_rpc::system::Request::SyncState(sender) => {
use sc_rpc::system::SyncState;

match sync_service.best_seen_block().await {
match sync_service.status().await.map(|status| status.best_seen_block) {
Ok(best_seen_block) => {
let best_number = client.info().best_number;
let _ = sender.send(SyncState {
Expand Down

0 comments on commit dd1aaa4

Please sign in to comment.