Skip to content

Commit

Permalink
Change network::sync::extra_requests to not send. (#2890)
Browse files Browse the repository at this point in the history
* Change network::sync::extra_requests to not send.

Instead it only maintains the invariants and leaves the actual I/O part
to the parent module (i.e. `sync`).

* Update Cargo.lock.
  • Loading branch information
twittner authored and arkpar committed Jun 18, 2019
1 parent 7eb2ff7 commit dc41558
Show file tree
Hide file tree
Showing 5 changed files with 413 additions and 370 deletions.
13 changes: 13 additions & 0 deletions substrate/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions substrate/core/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ zeroize = "0.6.0"
[dev-dependencies]
env_logger = { version = "0.6" }
keyring = { package = "substrate-keyring", path = "../../core/keyring" }
quickcheck = "0.8.5"
rand = "0.6.5"
test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" }
test_runtime = { package = "substrate-test-runtime", path = "../../core/test-runtime" }
consensus = { package = "substrate-consensus-common", path = "../../core/consensus/common", features = ["test-helpers"] }
Expand Down
3 changes: 1 addition & 2 deletions substrate/core/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,8 +765,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let outcome = self.sync.on_block_justification_data(
&mut ProtocolContext::new(&mut self.context_data, network_out),
peer,
request,
response,
response
);

if let Some((origin, hash, nb, just)) = outcome {
Expand Down
137 changes: 85 additions & 52 deletions substrate/core/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use client::{BlockStatus, ClientInfo};
use consensus::{BlockOrigin, import_queue::{IncomingBlock, SharedFinalityProofRequestBuilder}};
use client::error::Error as ClientError;
use blocks::BlockCollection;
use extra_requests::ExtraRequestsAggregator;
use extra_requests::ExtraRequests;
use runtime_primitives::traits::{
Block as BlockT, Header as HeaderT, NumberFor, Zero, One,
CheckedSub, SaturatedConversion
Expand Down Expand Up @@ -89,7 +89,7 @@ pub trait Context<B: BlockT> {
fn send_block_request(&mut self, who: PeerId, request: message::BlockRequest<B>);
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct PeerSync<B: BlockT> {
pub common_number: NumberFor<B>,
pub best_hash: B::Hash,
Expand All @@ -98,8 +98,8 @@ pub(crate) struct PeerSync<B: BlockT> {
pub recently_announced: VecDeque<B::Hash>,
}

#[derive(Debug)]
/// Peer sync status.
#[derive(Debug)]
pub(crate) struct PeerInfo<B: BlockT> {
/// Their best block hash.
pub best_hash: B::Hash,
Expand Down Expand Up @@ -129,16 +129,17 @@ pub(crate) enum PeerSyncState<B: BlockT> {

/// Relay chain sync strategy.
pub struct ChainSync<B: BlockT> {
_genesis_hash: B::Hash,
peers: HashMap<PeerId, PeerSync<B>>,
blocks: BlockCollection<B>,
best_queued_number: NumberFor<B>,
best_queued_hash: B::Hash,
role: Roles,
required_block_attributes: message::BlockAttributes,
extra_requests: ExtraRequestsAggregator<B>,
extra_finality_proofs: ExtraRequests<B>,
extra_justifications: ExtraRequests<B>,
queue_blocks: HashSet<B::Hash>,
best_importing_number: NumberFor<B>,
request_builder: Option<SharedFinalityProofRequestBuilder<B>>,
}

/// Reported sync state.
Expand Down Expand Up @@ -179,26 +180,26 @@ impl<B: BlockT> Status<B> {

impl<B: BlockT> ChainSync<B> {
/// Create a new instance. Pass the initial known state of the chain.
pub(crate) fn new(
role: Roles,
info: &ClientInfo<B>,
) -> Self {
let mut required_block_attributes = message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION;
pub(crate) fn new(role: Roles, info: &ClientInfo<B>) -> Self {
let mut required_block_attributes =
message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION;

if role.is_full() {
required_block_attributes |= message::BlockAttributes::BODY;
}

ChainSync {
_genesis_hash: info.chain.genesis_hash,
peers: HashMap::new(),
blocks: BlockCollection::new(),
best_queued_hash: info.chain.best_hash,
best_queued_number: info.chain.best_number,
extra_requests: ExtraRequestsAggregator::new(),
extra_finality_proofs: ExtraRequests::new(),
extra_justifications: ExtraRequests::new(),
role,
required_block_attributes,
queue_blocks: Default::default(),
best_importing_number: Zero::zero(),
request_builder: None,
}
}

Expand Down Expand Up @@ -499,9 +500,9 @@ impl<B: BlockT> ChainSync<B> {
&mut self,
protocol: &mut dyn Context<B>,
who: PeerId,
_request: message::BlockRequest<B>,
response: message::BlockResponse<B>,
) -> Option<(PeerId, B::Hash, NumberFor<B>, Justification)> {
) -> Option<(PeerId, B::Hash, NumberFor<B>, Justification)>
{
let peer = if let Some(peer) = self.peers.get_mut(&who) {
peer
} else {
Expand All @@ -522,12 +523,8 @@ impl<B: BlockT> ChainSync<B> {
protocol.disconnect_peer(who);
return None;
}

return self.extra_requests.justifications().on_response(
who,
response.justification,
);
},
return self.extra_justifications.on_response(who, response.justification)
}
None => {
// we might have asked the peer for a justification on a block that we thought it had
// (regardless of whether it had a justification for it or not).
Expand All @@ -536,7 +533,7 @@ impl<B: BlockT> ChainSync<B> {
hash,
);
return None;
},
}
}
}

Expand Down Expand Up @@ -574,10 +571,7 @@ impl<B: BlockT> ChainSync<B> {
return None;
}

return self.extra_requests.finality_proofs().on_response(
who,
response.proof,
);
return self.extra_finality_proofs.on_response(who, response.proof)
}

self.maintain_sync(protocol);
Expand All @@ -603,38 +597,70 @@ impl<B: BlockT> ChainSync<B> {
for peer in peers {
self.download_new(protocol, peer);
}
self.extra_requests.dispatch(&mut self.peers, protocol);
self.tick(protocol)
}

/// Called periodically to perform any time-based actions. Must be called at a regular
/// interval.
pub fn tick(&mut self, protocol: &mut dyn Context<B>) {
self.extra_requests.dispatch(&mut self.peers, protocol);
self.send_justification_requests(protocol);
self.send_finality_proof_request(protocol)
}

fn send_justification_requests(&mut self, protocol: &mut dyn Context<B>) {
let mut matcher = self.extra_justifications.matcher();
while let Some((peer, request)) = matcher.next(&self.peers) {
self.peers.get_mut(&peer)
.expect("`Matcher::next` guarantees the `PeerId` comes from the given peers; qed")
.state = PeerSyncState::DownloadingJustification(request.0);
protocol.send_block_request(peer, message::generic::BlockRequest {
id: 0,
fields: message::BlockAttributes::JUSTIFICATION,
from: message::FromBlock::Hash(request.0),
to: None,
direction: message::Direction::Ascending,
max: Some(1)
})
}
}

fn send_finality_proof_request(&mut self, protocol: &mut dyn Context<B>) {
let mut matcher = self.extra_finality_proofs.matcher();
while let Some((peer, request)) = matcher.next(&self.peers) {
self.peers.get_mut(&peer)
.expect("`Matcher::next` guarantees the `PeerId` comes from the given peers; qed")
.state = PeerSyncState::DownloadingFinalityProof(request.0);
protocol.send_finality_proof_request(peer, message::generic::FinalityProofRequest {
id: 0,
block: request.0,
request: self.request_builder.as_ref()
.map(|builder| builder.build_request_data(&request.0))
.unwrap_or_default()
})
}
}

/// Request a justification for the given block.
///
/// Uses `protocol` to queue a new justification request and tries to dispatch all pending
/// requests.
pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>, protocol: &mut dyn Context<B>) {
self.extra_requests.justifications().queue_request(
(*hash, number),
|base, block| protocol.client().is_descendent_of(base, block),
);

self.extra_requests.justifications().dispatch(&mut self.peers, protocol);
self.extra_justifications.schedule((*hash, number), |base, block| {
protocol.client().is_descendent_of(base, block)
});
self.send_justification_requests(protocol)
}

/// Clears all pending justification requests.
pub fn clear_justification_requests(&mut self) {
self.extra_requests.justifications().clear();
self.extra_justifications.reset()
}

/// Call this when a justification has been processed by the import queue, with or without
/// errors.
pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
if !self.extra_requests.justifications().on_import_result((hash, number), finalization_result) {
if !self.extra_justifications.try_finalize_root((hash, number), finalization_result, true) {
debug!(target: "sync", "Got justification import result for unknown justification {:?} {:?} request.",
hash,
number,
Expand All @@ -646,24 +672,22 @@ impl<B: BlockT> ChainSync<B> {
///
/// Queues a new finality proof request and tries to dispatch all pending requests.
pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor<B>, protocol: &mut dyn Context<B>) {
self.extra_requests.finality_proofs().queue_request(
(*hash, number),
|base, block| protocol.client().is_descendent_of(base, block),
);

self.extra_requests.finality_proofs().dispatch(&mut self.peers, protocol);
self.extra_finality_proofs.schedule((*hash, number), |base, block| {
protocol.client().is_descendent_of(base, block)
});
self.send_finality_proof_request(protocol)
}

pub fn finality_proof_import_result(
&mut self,
request_block: (B::Hash, NumberFor<B>),
finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
) {
self.extra_requests.finality_proofs().on_import_result(request_block, finalization_result);
self.extra_finality_proofs.try_finalize_root(request_block, finalization_result, true);
}

pub fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder<B>) {
self.extra_requests.finality_proofs().essence().0 = Some(request_builder);
pub fn set_finality_proof_request_builder(&mut self, builder: SharedFinalityProofRequestBuilder<B>) {
self.request_builder = Some(builder)
}

/// Notify about successful import of the given block.
Expand All @@ -673,13 +697,21 @@ impl<B: BlockT> ChainSync<B> {

/// Notify about finalization of the given block.
pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>, protocol: &mut dyn Context<B>) {
if let Err(err) = self.extra_requests.on_block_finalized(
hash,
number,
&|base, block| protocol.client().is_descendent_of(base, block),
) {
warn!(target: "sync", "Error cleaning up pending extra data requests: {:?}", err);
};
let r = self.extra_finality_proofs.on_block_finalized(hash, number, |base, block| {
protocol.client().is_descendent_of(base, block)
});

if let Err(err) = r {
warn!(target: "sync", "Error cleaning up pending extra finality proof data requests: {:?}", err);
}

let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| {
protocol.client().is_descendent_of(base, block)
});

if let Err(err) = r {
warn!(target: "sync", "Error cleaning up pending extra justification data requests: {:?}", err);
}
}

fn block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
Expand Down Expand Up @@ -859,7 +891,8 @@ impl<B: BlockT> ChainSync<B> {
pub(crate) fn peer_disconnected(&mut self, protocol: &mut dyn Context<B>, who: PeerId) {
self.blocks.clear_peer_download(&who);
self.peers.remove(&who);
self.extra_requests.peer_disconnected(who);
self.extra_justifications.peer_disconnected(&who);
self.extra_finality_proofs.peer_disconnected(&who);
self.maintain_sync(protocol);
}

Expand Down
Loading

0 comments on commit dc41558

Please sign in to comment.