diff --git a/core/network/src/protocol/sync.rs b/core/network/src/protocol/sync.rs index 4f08c942def1b..34bc68f933686 100644 --- a/core/network/src/protocol/sync.rs +++ b/core/network/src/protocol/sync.rs @@ -392,7 +392,8 @@ impl ChainSync { state: PeerSyncState::Available, recently_announced: Default::default(), }); - return Ok(self.select_new_blocks(who).map(|(_, req)| req)) + self.is_idle = false; + return Ok(None) } let common_best = std::cmp::min(self.best_queued_number, info.best_number); @@ -567,6 +568,7 @@ impl ChainSync { trace!(target: "sync", "Too many blocks in the queue."); return Either::Left(std::iter::empty()) } + let major_sync = self.status().state == SyncState::Downloading; let blocks = &mut self.blocks; let attrs = &self.required_block_attributes; let fork_targets = &self.fork_targets; @@ -596,7 +598,7 @@ impl ChainSync { peer.state = PeerSyncState::DownloadingStale(hash); have_requests = true; Some((id.clone(), req)) - } else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs) { + } else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs, major_sync) { peer.state = PeerSyncState::DownloadingNew(range.start); trace!(target: "sync", "New block request for {}", id); have_requests = true; @@ -1123,39 +1125,6 @@ impl ChainSync { }) } - /// Select a range of new blocks to download from the given peer. - fn select_new_blocks(&mut self, who: PeerId) -> Option<(Range>, BlockRequest)> { - // when there are too many blocks in the queue => do not try to download new blocks - if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { - trace!(target: "sync", "Too many blocks in the queue."); - return None - } - - let peer = self.peers.get_mut(&who)?; - - if !peer.state.is_available() { - trace!(target: "sync", "Peer {} is busy", who); - return None - } - - trace!( - target: "sync", - "Considering new block download from {}, common block is {}, best is {:?}", - who, - peer.common_number, - peer.best_number - ); - - if let Some((range, req)) = peer_block_request(&who, peer, &mut self.blocks, &self.required_block_attributes) { - trace!(target: "sync", "Requesting blocks from {}, ({} to {})", who, range.start, range.end); - peer.state = PeerSyncState::DownloadingNew(range.start); - Some((range, req)) - } else { - trace!(target: "sync", "Nothing to request from {}", who); - None - } - } - /// What is the status of the block corresponding to the given hash? fn block_status(&self, hash: &B::Hash) -> Result { if self.queue_blocks.contains(hash) { @@ -1254,8 +1223,16 @@ fn peer_block_request( peer: &PeerSync, blocks: &mut BlockCollection, attrs: &message::BlockAttributes, + major_sync: bool, ) -> Option<(Range>, BlockRequest)> { - if let Some(range) = blocks.needed_blocks(id.clone(), MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number) { + let max_parallel = if major_sync { 1 } else { 3 }; + if let Some(range) = blocks.needed_blocks( + id.clone(), + MAX_BLOCKS_TO_REQUEST, + peer.best_number, + peer.common_number, + max_parallel, + ) { let request = message::generic::BlockRequest { id: 0, fields: attrs.clone(), diff --git a/core/network/src/protocol/sync/blocks.rs b/core/network/src/protocol/sync/blocks.rs index 90264249ea03c..a972caf9519ec 100644 --- a/core/network/src/protocol/sync/blocks.rs +++ b/core/network/src/protocol/sync/blocks.rs @@ -24,8 +24,6 @@ use libp2p::PeerId; use sr_primitives::traits::{Block as BlockT, NumberFor, One}; use crate::message; -const MAX_PARALLEL_DOWNLOADS: u32 = 1; - /// Block data with origin. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BlockData { @@ -84,9 +82,7 @@ impl BlockCollection { match self.blocks.get(&start) { Some(&BlockRangeState::Downloading { .. }) => { - trace!(target: "sync", "Ignored block data still marked as being downloaded: {}", start); - debug_assert!(false); - return; + trace!(target: "sync", "Inserting block data still marked as being downloaded: {}", start); }, Some(&BlockRangeState::Complete(ref existing)) if existing.len() >= blocks.len() => { trace!(target: "sync", "Ignored block data already downloaded: {}", start); @@ -100,8 +96,15 @@ impl BlockCollection { } /// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded. - pub fn needed_blocks(&mut self, who: PeerId, count: usize, peer_best: NumberFor, common: NumberFor) - -> Option>> { + pub fn needed_blocks( + &mut self, + who: PeerId, + count: usize, + peer_best: NumberFor, + common: NumberFor, + max_parallel: u32, + ) -> Option>> + { // First block number that we need to download let first_different = common + >::one(); let count = (count as u32).into(); @@ -112,7 +115,7 @@ impl BlockCollection { let next = downloading_iter.next(); break match &(prev, next) { &(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _) - if downloading < MAX_PARALLEL_DOWNLOADS => + if downloading < max_parallel => (*start .. *start + *len, downloading), &(Some((start, r)), Some((next_start, _))) if *start + r.len() < *next_start => (*start + r.len() .. cmp::min(*next_start, *start + r.len() + count), 0), // gap @@ -185,7 +188,6 @@ impl BlockCollection { true }, _ => { - debug_assert!(false); false } }; @@ -242,18 +244,18 @@ mod test { let peer2 = PeerId::random(); let blocks = generate_blocks(150); - assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0), Some(1 .. 41)); - assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0), Some(41 .. 81)); - assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 0), Some(81 .. 121)); + assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0, 1), Some(1 .. 41)); + assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0, 1), Some(41 .. 81)); + assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 0, 1), Some(81 .. 121)); bc.clear_peer_download(&peer1); bc.insert(41, blocks[41..81].to_vec(), peer1.clone()); assert_eq!(bc.drain(1), vec![]); - assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0), Some(121 .. 151)); + assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0, 1), Some(121 .. 151)); bc.clear_peer_download(&peer0); bc.insert(1, blocks[1..11].to_vec(), peer0.clone()); - assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0), Some(11 .. 41)); + assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0, 1), Some(11 .. 41)); assert_eq!(bc.drain(1), blocks[1..11].iter() .map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }).collect::>()); @@ -267,7 +269,7 @@ mod test { .map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }).collect::>()[..]); bc.clear_peer_download(&peer2); - assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 80), Some(81 .. 121)); + assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 80, 1), Some(81 .. 121)); bc.clear_peer_download(&peer2); bc.insert(81, blocks[81..121].to_vec(), peer2.clone()); bc.clear_peer_download(&peer1); @@ -292,7 +294,7 @@ mod test { bc.blocks.insert(114305, BlockRangeState::Complete(blocks)); let peer0 = PeerId::random(); - assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 000), Some(1 .. 100)); - assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 600), Some(100 + 128 .. 100 + 128 + 128)); + assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 000, 1), Some(1 .. 100)); + assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 600, 1), Some(100 + 128 .. 100 + 128 + 128)); } }