Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Enable parallel block download
Browse files Browse the repository at this point in the history
  • Loading branch information
arkpar committed Nov 4, 2019
1 parent 7874be8 commit 5ca8be4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 45 deletions.
49 changes: 13 additions & 36 deletions core/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,8 @@ impl<B: BlockT> ChainSync<B> {
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
return Ok(self.select_new_blocks(who).map(|(_, req)| req))
self.is_idle = false;
return Ok(None)
}

let common_best = std::cmp::min(self.best_queued_number, info.best_number);
Expand Down Expand Up @@ -567,6 +568,7 @@ impl<B: BlockT> ChainSync<B> {
trace!(target: "sync", "Too many blocks in the queue.");
return Either::Left(std::iter::empty())
}
let major_sync = self.status().state == SyncState::Downloading;
let blocks = &mut self.blocks;
let attrs = &self.required_block_attributes;
let fork_targets = &self.fork_targets;
Expand Down Expand Up @@ -596,7 +598,7 @@ impl<B: BlockT> ChainSync<B> {
peer.state = PeerSyncState::DownloadingStale(hash);
have_requests = true;
Some((id.clone(), req))
} else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs) {
} else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs, major_sync) {
peer.state = PeerSyncState::DownloadingNew(range.start);
trace!(target: "sync", "New block request for {}", id);
have_requests = true;
Expand Down Expand Up @@ -1123,39 +1125,6 @@ impl<B: BlockT> ChainSync<B> {
})
}

/// Select a range of new blocks to download from the given peer.
fn select_new_blocks(&mut self, who: PeerId) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
// when there are too many blocks in the queue => do not try to download new blocks
if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
trace!(target: "sync", "Too many blocks in the queue.");
return None
}

let peer = self.peers.get_mut(&who)?;

if !peer.state.is_available() {
trace!(target: "sync", "Peer {} is busy", who);
return None
}

trace!(
target: "sync",
"Considering new block download from {}, common block is {}, best is {:?}",
who,
peer.common_number,
peer.best_number
);

if let Some((range, req)) = peer_block_request(&who, peer, &mut self.blocks, &self.required_block_attributes) {
trace!(target: "sync", "Requesting blocks from {}, ({} to {})", who, range.start, range.end);
peer.state = PeerSyncState::DownloadingNew(range.start);
Some((range, req))
} else {
trace!(target: "sync", "Nothing to request from {}", who);
None
}
}

/// What is the status of the block corresponding to the given hash?
fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
if self.queue_blocks.contains(hash) {
Expand Down Expand Up @@ -1254,8 +1223,16 @@ fn peer_block_request<B: BlockT>(
peer: &PeerSync<B>,
blocks: &mut BlockCollection<B>,
attrs: &message::BlockAttributes,
major_sync: bool,
) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
if let Some(range) = blocks.needed_blocks(id.clone(), MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number) {
let max_parallel = if major_sync { 1 } else { 3 };
if let Some(range) = blocks.needed_blocks(
id.clone(),
MAX_BLOCKS_TO_REQUEST,
peer.best_number,
peer.common_number,
max_parallel,
) {
let request = message::generic::BlockRequest {
id: 0,
fields: attrs.clone(),
Expand Down
20 changes: 11 additions & 9 deletions core/network/src/protocol/sync/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ use libp2p::PeerId;
use sr_primitives::traits::{Block as BlockT, NumberFor, One};
use crate::message;

const MAX_PARALLEL_DOWNLOADS: u32 = 1;

/// Block data with origin.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockData<B: BlockT> {
Expand Down Expand Up @@ -84,9 +82,7 @@ impl<B: BlockT> BlockCollection<B> {

match self.blocks.get(&start) {
Some(&BlockRangeState::Downloading { .. }) => {
trace!(target: "sync", "Ignored block data still marked as being downloaded: {}", start);
debug_assert!(false);
return;
trace!(target: "sync", "Inserting block data still marked as being downloaded: {}", start);
},
Some(&BlockRangeState::Complete(ref existing)) if existing.len() >= blocks.len() => {
trace!(target: "sync", "Ignored block data already downloaded: {}", start);
Expand All @@ -100,8 +96,15 @@ impl<B: BlockT> BlockCollection<B> {
}

/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
pub fn needed_blocks(&mut self, who: PeerId, count: usize, peer_best: NumberFor<B>, common: NumberFor<B>)
-> Option<Range<NumberFor<B>>> {
pub fn needed_blocks(
&mut self,
who: PeerId,
count: usize,
peer_best: NumberFor<B>,
common: NumberFor<B>,
max_parallel: u32,
) -> Option<Range<NumberFor<B>>>
{
// First block number that we need to download
let first_different = common + <NumberFor<B>>::one();
let count = (count as u32).into();
Expand All @@ -112,7 +115,7 @@ impl<B: BlockT> BlockCollection<B> {
let next = downloading_iter.next();
break match &(prev, next) {
&(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
if downloading < MAX_PARALLEL_DOWNLOADS =>
if downloading < max_parallel =>
(*start .. *start + *len, downloading),
&(Some((start, r)), Some((next_start, _))) if *start + r.len() < *next_start =>
(*start + r.len() .. cmp::min(*next_start, *start + r.len() + count), 0), // gap
Expand Down Expand Up @@ -185,7 +188,6 @@ impl<B: BlockT> BlockCollection<B> {
true
},
_ => {
debug_assert!(false);
false
}
};
Expand Down

0 comments on commit 5ca8be4

Please sign in to comment.