diff --git a/Cargo.lock b/Cargo.lock index 15f526edd955d..f570676f20e47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -617,7 +617,8 @@ name = "polkadot-candidate-agreement" version = "0.1.0" dependencies = [ "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", - "polkadot-primitives 0.1.0", + "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1043,6 +1044,15 @@ dependencies = [ "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tokio-timer" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "triehash" version = "0.1.0" @@ -1238,6 +1248,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "514aae203178929dbf03318ad7c683126672d4d96eccb77b29603d33c9e25743" "checksum tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fbb47ae81353c63c487030659494b295f6cb6576242f907f203473b191b0389" "checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162" +"checksum tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6131e780037787ff1b3f8aad9da83bca02438b72277850dd6ad0d455e0e20efc" "checksum triehash 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9291c7f0fae44858b5e087dd462afb382354120003778f1695b44aab98c7abd7" "checksum uint 0.1.0 (git+https://github.com/paritytech/primitives.git)" = "" "checksum unicase 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2e01da42520092d0cd2d6ac3ae69eb21a22ad43ff195676b86f8c37f487d6b80" diff --git a/candidate-agreement/Cargo.toml b/candidate-agreement/Cargo.toml index 9a2dc0ffb77db..8aa2d0001b5e8 100644 --- a/candidate-agreement/Cargo.toml +++ b/candidate-agreement/Cargo.toml @@ -4,5 +4,6 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] -futures = "0.1" -polkadot-primitives = { path = "../primitives" } +futures = "0.1.17" +parking_lot = "0.4" +tokio-timer = "0.1.2" diff --git a/candidate-agreement/src/bft/accumulator.rs b/candidate-agreement/src/bft/accumulator.rs index 8999a9f29bb00..ab035737fb84e 100644 --- a/candidate-agreement/src/bft/accumulator.rs +++ b/candidate-agreement/src/bft/accumulator.rs @@ -117,37 +117,37 @@ struct VoteCounts { /// Accumulates messages for a given round of BFT consensus. /// -/// This isn't tied to the "view" of a single validator. It +/// This isn't tied to the "view" of a single authority. It /// keeps accurate track of the state of the BFT consensus based /// on all messages imported. #[derive(Debug)] -pub struct Accumulator +pub struct Accumulator where Candidate: Eq + Clone, Digest: Hash + Eq + Clone, - ValidatorId: Hash + Eq, + AuthorityId: Hash + Eq, Signature: Eq + Clone, { round_number: usize, threshold: usize, - round_proposer: ValidatorId, + round_proposer: AuthorityId, proposal: Option, - prepares: HashMap, - commits: HashMap, + prepares: HashMap, + commits: HashMap, vote_counts: HashMap, - advance_round: HashSet, + advance_round: HashSet, state: State, } -impl Accumulator +impl Accumulator where Candidate: Eq + Clone, Digest: Hash + Eq + Clone, - ValidatorId: Hash + Eq, + AuthorityId: Hash + Eq, Signature: Eq + Clone, { /// Create a new state accumulator. - pub fn new(round_number: usize, threshold: usize, round_proposer: ValidatorId) -> Self { + pub fn new(round_number: usize, threshold: usize, round_proposer: AuthorityId) -> Self { Accumulator { round_number, threshold, @@ -171,11 +171,6 @@ impl Accumulator &ValidatorId { - &self.round_proposer - } - pub fn proposal(&self) -> Option<&Candidate> { self.proposal.as_ref() } @@ -189,7 +184,7 @@ impl Accumulator, + message: LocalizedMessage, ) { // message from different round. @@ -210,7 +205,7 @@ impl Accumulator Accumulator Accumulator Accumulator::new(1, 7, ValidatorId(8)); + let mut accumulator = Accumulator::<_, Digest, _, _>::new(1, 7, AuthorityId(8)); assert_eq!(accumulator.state(), &State::Begin); accumulator.import_message(LocalizedMessage { - sender: ValidatorId(5), + sender: AuthorityId(5), signature: Signature(999, 5), message: Message::Propose(1, Candidate(999)), }); @@ -382,7 +377,7 @@ mod tests { assert_eq!(accumulator.state(), &State::Begin); accumulator.import_message(LocalizedMessage { - sender: ValidatorId(8), + sender: AuthorityId(8), signature: Signature(999, 8), message: Message::Propose(1, Candidate(999)), }); @@ -392,11 +387,11 @@ mod tests { #[test] fn reaches_prepare_phase() { - let mut accumulator = Accumulator::new(1, 7, ValidatorId(8)); + let mut accumulator = Accumulator::new(1, 7, AuthorityId(8)); assert_eq!(accumulator.state(), &State::Begin); accumulator.import_message(LocalizedMessage { - sender: ValidatorId(8), + sender: AuthorityId(8), signature: Signature(999, 8), message: Message::Propose(1, Candidate(999)), }); @@ -405,7 +400,7 @@ mod tests { for i in 0..6 { accumulator.import_message(LocalizedMessage { - sender: ValidatorId(i), + sender: AuthorityId(i), signature: Signature(999, i), message: Message::Prepare(1, Digest(999)), }); @@ -414,7 +409,7 @@ mod tests { } accumulator.import_message(LocalizedMessage { - sender: ValidatorId(7), + sender: AuthorityId(7), signature: Signature(999, 7), message: Message::Prepare(1, Digest(999)), }); @@ -427,11 +422,11 @@ mod tests { #[test] fn prepare_to_commit() { - let mut accumulator = Accumulator::new(1, 7, ValidatorId(8)); + let mut accumulator = Accumulator::new(1, 7, AuthorityId(8)); assert_eq!(accumulator.state(), &State::Begin); accumulator.import_message(LocalizedMessage { - sender: ValidatorId(8), + sender: AuthorityId(8), signature: Signature(999, 8), message: Message::Propose(1, Candidate(999)), }); @@ -440,7 +435,7 @@ mod tests { for i in 0..6 { accumulator.import_message(LocalizedMessage { - sender: ValidatorId(i), + sender: AuthorityId(i), signature: Signature(999, i), message: Message::Prepare(1, Digest(999)), }); @@ -449,7 +444,7 @@ mod tests { } accumulator.import_message(LocalizedMessage { - sender: ValidatorId(7), + sender: AuthorityId(7), signature: Signature(999, 7), message: Message::Prepare(1, Digest(999)), }); @@ -461,7 +456,7 @@ mod tests { for i in 0..6 { accumulator.import_message(LocalizedMessage { - sender: ValidatorId(i), + sender: AuthorityId(i), signature: Signature(999, i), message: Message::Commit(1, Digest(999)), }); @@ -473,7 +468,7 @@ mod tests { } accumulator.import_message(LocalizedMessage { - sender: ValidatorId(7), + sender: AuthorityId(7), signature: Signature(999, 7), message: Message::Commit(1, Digest(999)), }); @@ -486,11 +481,11 @@ mod tests { #[test] fn prepare_to_advance() { - let mut accumulator = Accumulator::new(1, 7, ValidatorId(8)); + let mut accumulator = Accumulator::new(1, 7, AuthorityId(8)); assert_eq!(accumulator.state(), &State::Begin); accumulator.import_message(LocalizedMessage { - sender: ValidatorId(8), + sender: AuthorityId(8), signature: Signature(999, 8), message: Message::Propose(1, Candidate(999)), }); @@ -499,7 +494,7 @@ mod tests { for i in 0..7 { accumulator.import_message(LocalizedMessage { - sender: ValidatorId(i), + sender: AuthorityId(i), signature: Signature(999, i), message: Message::Prepare(1, Digest(999)), }); @@ -512,7 +507,7 @@ mod tests { for i in 0..6 { accumulator.import_message(LocalizedMessage { - sender: ValidatorId(i), + sender: AuthorityId(i), signature: Signature(999, i), message: Message::AdvanceRound(1), }); @@ -524,7 +519,7 @@ mod tests { } accumulator.import_message(LocalizedMessage { - sender: ValidatorId(7), + sender: AuthorityId(7), signature: Signature(999, 7), message: Message::AdvanceRound(1), }); @@ -537,12 +532,12 @@ mod tests { #[test] fn conclude_different_than_proposed() { - let mut accumulator = Accumulator::::new(1, 7, ValidatorId(8)); + let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); assert_eq!(accumulator.state(), &State::Begin); for i in 0..7 { accumulator.import_message(LocalizedMessage { - sender: ValidatorId(i), + sender: AuthorityId(i), signature: Signature(999, i), message: Message::Prepare(1, Digest(999)), }); @@ -555,7 +550,7 @@ mod tests { for i in 0..7 { accumulator.import_message(LocalizedMessage { - sender: ValidatorId(i), + sender: AuthorityId(i), signature: Signature(999, i), message: Message::Commit(1, Digest(999)), }); @@ -569,12 +564,12 @@ mod tests { #[test] fn begin_to_advance() { - let mut accumulator = Accumulator::::new(1, 7, ValidatorId(8)); + let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); assert_eq!(accumulator.state(), &State::Begin); for i in 0..7 { accumulator.import_message(LocalizedMessage { - sender: ValidatorId(i), + sender: AuthorityId(i), signature: Signature(1, i), message: Message::AdvanceRound(1), }); @@ -588,12 +583,12 @@ mod tests { #[test] fn conclude_without_prepare() { - let mut accumulator = Accumulator::::new(1, 7, ValidatorId(8)); + let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); assert_eq!(accumulator.state(), &State::Begin); for i in 0..7 { accumulator.import_message(LocalizedMessage { - sender: ValidatorId(i), + sender: AuthorityId(i), signature: Signature(999, i), message: Message::Commit(1, Digest(999)), }); diff --git a/candidate-agreement/src/bft/mod.rs b/candidate-agreement/src/bft/mod.rs index b17092c451412..f131e44e1f8b9 100644 --- a/candidate-agreement/src/bft/mod.rs +++ b/candidate-agreement/src/bft/mod.rs @@ -76,30 +76,30 @@ pub trait Context { type Candidate: Debug + Eq + Clone; /// Candidate digest. type Digest: Debug + Hash + Eq + Clone; - /// Validator ID. - type ValidatorId: Debug + Hash + Eq + Clone; + /// Authority ID. + type AuthorityId: Debug + Hash + Eq + Clone; /// Signature. type Signature: Debug + Eq + Clone; /// A future that resolves when a round timeout is concluded. type RoundTimeout: Future; /// A future that resolves when a proposal is ready. - type Proposal: Future; + type CreateProposal: Future; - /// Get the local validator ID. - fn local_id(&self) -> Self::ValidatorId; + /// Get the local authority ID. + fn local_id(&self) -> Self::AuthorityId; /// Get the best proposal. - fn proposal(&self) -> Self::Proposal; + fn proposal(&self) -> Self::CreateProposal; /// Get the digest of a candidate. fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest; - /// Sign a message using the local validator ID. + /// Sign a message using the local authority ID. fn sign_local(&self, message: Message) - -> LocalizedMessage; + -> LocalizedMessage; /// Get the proposer for a given round of consensus. - fn round_proposer(&self, round: usize) -> Self::ValidatorId; + fn round_proposer(&self, round: usize) -> Self::AuthorityId; /// Whether the candidate is valid. fn candidate_valid(&self, candidate: &Self::Candidate) -> bool; @@ -121,11 +121,11 @@ pub enum Communication { /// Type alias for a localized message using only type parameters from `Context`. // TODO: actual type alias when it's no longer a warning. -pub struct ContextCommunication(pub Communication); +pub struct ContextCommunication(pub Communication); impl Clone for ContextCommunication where - LocalizedMessage: Clone, + LocalizedMessage: Clone, PrepareJustification: Clone, { fn clone(&self) -> Self { @@ -242,19 +242,19 @@ enum LocalState { // - a higher threshold-prepare is broadcast to us. in this case we can // advance to the round of the threshold-prepare. this is an indication // that we have experienced severe asynchrony/clock drift with the remainder -// of the other validators, and it is unlikely that we can assist in +// of the other authorities, and it is unlikely that we can assist in // consensus meaningfully. nevertheless we make an attempt. struct Strategy { nodes: usize, max_faulty: usize, - fetching_proposal: Option, + fetching_proposal: Option, round_timeout: future::Fuse, local_state: LocalState, locked: Option>, notable_candidates: HashMap, - current_accumulator: Accumulator, - future_accumulator: Accumulator, - local_id: C::ValidatorId, + current_accumulator: Accumulator, + future_accumulator: Accumulator, + local_id: C::AuthorityId, } impl Strategy { @@ -290,7 +290,7 @@ impl Strategy { fn import_message( &mut self, - msg: LocalizedMessage + msg: LocalizedMessage ) { let round_number = msg.message.round_number(); @@ -330,7 +330,7 @@ impl Strategy { -> Poll, E> where C::RoundTimeout: Future, - C::Proposal: Future, + C::CreateProposal: Future, { let mut last_watermark = ( self.current_accumulator.round_number(), @@ -363,7 +363,7 @@ impl Strategy { -> Poll, E> where C::RoundTimeout: Future, - C::Proposal: Future, + C::CreateProposal: Future, { self.propose(context, sending)?; self.prepare(context, sending); @@ -413,7 +413,7 @@ impl Strategy { } fn propose(&mut self, context: &C, sending: &mut Sending>) - -> Result<(), ::Error> + -> Result<(), ::Error> { if let LocalState::Start = self.local_state { let mut propose = false; @@ -629,7 +629,7 @@ impl Future for Agreement where C: Context, C::RoundTimeout: Future, - C::Proposal: Future, + C::CreateProposal: Future, I: Stream,Error=E>, O: Sink,SinkError=E>, E: From, @@ -699,8 +699,15 @@ impl Future for Agreement /// conclude without having witnessed the conclusion. /// In general, this future should be pre-empted by the import of a justification /// set for this block height. -pub fn agree(context: C, nodes: usize, max_faulty: usize, input: I, output: O) +pub fn agree(context: C, nodes: usize, max_faulty: usize, input: I, output: O) -> Agreement + where + C: Context, + C::RoundTimeout: Future, + C::CreateProposal: Future, + I: Stream,Error=E>, + O: Sink,SinkError=E>, + E: From, { let strategy = Strategy::create(&context, nodes, max_faulty); Agreement { diff --git a/candidate-agreement/src/bft/tests.rs b/candidate-agreement/src/bft/tests.rs index ff66ff047658b..10ef9321242b1 100644 --- a/candidate-agreement/src/bft/tests.rs +++ b/candidate-agreement/src/bft/tests.rs @@ -18,11 +18,13 @@ use super::*; +use tests::Network; + use std::sync::{Arc, Mutex}; use std::time::Duration; use futures::prelude::*; -use futures::sync::{oneshot, mpsc}; +use futures::sync::oneshot; use futures::future::FutureResult; #[derive(Debug, PartialEq, Eq, Clone, Hash)] @@ -32,10 +34,10 @@ struct Candidate(usize); struct Digest(usize); #[derive(Debug, PartialEq, Eq, Clone, Hash)] -struct ValidatorId(usize); +struct AuthorityId(usize); #[derive(Debug, PartialEq, Eq, Clone)] -struct Signature(Message, ValidatorId); +struct Signature(Message, AuthorityId); struct SharedContext { node_count: usize, @@ -87,13 +89,13 @@ impl SharedContext { self.current_round += 1; } - fn round_proposer(&self, round: usize) -> ValidatorId { - ValidatorId(round % self.node_count) + fn round_proposer(&self, round: usize) -> AuthorityId { + AuthorityId(round % self.node_count) } } struct TestContext { - local_id: ValidatorId, + local_id: AuthorityId, proposal: Mutex, shared: Arc>, } @@ -101,16 +103,16 @@ struct TestContext { impl Context for TestContext { type Candidate = Candidate; type Digest = Digest; - type ValidatorId = ValidatorId; + type AuthorityId = AuthorityId; type Signature = Signature; type RoundTimeout = Box>; - type Proposal = FutureResult; + type CreateProposal = FutureResult; - fn local_id(&self) -> ValidatorId { + fn local_id(&self) -> AuthorityId { self.local_id.clone() } - fn proposal(&self) -> Self::Proposal { + fn proposal(&self) -> Self::CreateProposal { let proposal = { let mut p = self.proposal.lock().unwrap(); let x = *p; @@ -126,7 +128,7 @@ impl Context for TestContext { } fn sign_local(&self, message: Message) - -> LocalizedMessage + -> LocalizedMessage { let signature = Signature(message.clone(), self.local_id.clone()); LocalizedMessage { @@ -136,7 +138,7 @@ impl Context for TestContext { } } - fn round_proposer(&self, round: usize) -> ValidatorId { + fn round_proposer(&self, round: usize) -> AuthorityId { self.shared.lock().unwrap().round_proposer(round) } @@ -149,70 +151,6 @@ impl Context for TestContext { } } -type Comm = ContextCommunication; - -struct Network { - endpoints: Vec>, - input: mpsc::UnboundedReceiver<(usize, Comm)>, -} - -impl Network { - fn new(nodes: usize) - -> (Network, Vec>, Vec>) - { - let mut inputs = Vec::with_capacity(nodes); - let mut outputs = Vec::with_capacity(nodes); - let mut endpoints = Vec::with_capacity(nodes); - - let (in_tx, in_rx) = mpsc::unbounded(); - for _ in 0..nodes { - let (out_tx, out_rx) = mpsc::unbounded(); - inputs.push(in_tx.clone()); - outputs.push(out_rx); - endpoints.push(out_tx); - } - - let network = Network { - endpoints, - input: in_rx, - }; - - (network, inputs, outputs) - } - - fn route_on_thread(self) { - ::std::thread::spawn(move || { let _ = self.wait(); }); - } -} - -impl Future for Network { - type Item = (); - type Error = Error; - - fn poll(&mut self) -> Poll<(), Error> { - match self.input.poll() { - Err(_) => Err(Error), - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(None)) => Ok(Async::Ready(())), - Ok(Async::Ready(Some((sender, item)))) => { - { - let receiving_endpoints = self.endpoints - .iter() - .enumerate() - .filter(|&(i, _)| i != sender) - .map(|(_, x)| x); - - for endpoint in receiving_endpoints { - let _ = endpoint.unbounded_send(item.clone()); - } - } - - self.poll() - } - } - } -} - fn timeout_in(t: Duration) -> oneshot::Receiver<()> { let (tx, rx) = oneshot::channel(); ::std::thread::spawn(move || { @@ -240,7 +178,7 @@ fn consensus_completes_with_minimum_good() { .enumerate() .map(|(i, (tx, rx))| { let ctx = TestContext { - local_id: ValidatorId(i), + local_id: AuthorityId(i), proposal: Mutex::new(i), shared: shared_context.clone(), }; @@ -296,7 +234,7 @@ fn consensus_does_not_complete_without_enough_nodes() { .enumerate() .map(|(i, (tx, rx))| { let ctx = TestContext { - local_id: ValidatorId(i), + local_id: AuthorityId(i), proposal: Mutex::new(i), shared: shared_context.clone(), }; @@ -335,7 +273,7 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() { round_number: locked_round, digest: locked_digest.clone(), signatures: (0..7) - .map(|i| Signature(Message::Prepare(locked_round, locked_digest.clone()), ValidatorId(i))) + .map(|i| Signature(Message::Prepare(locked_round, locked_digest.clone()), AuthorityId(i))) .collect() }.check(7, |_, _, s| Some(s.1.clone())).unwrap(); @@ -352,7 +290,7 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() { .enumerate() .map(|(i, (tx, rx))| { let ctx = TestContext { - local_id: ValidatorId(i), + local_id: AuthorityId(i), proposal: Mutex::new(i), shared: shared_context.clone(), }; diff --git a/candidate-agreement/src/handle_incoming.rs b/candidate-agreement/src/handle_incoming.rs new file mode 100644 index 0000000000000..625c950784106 --- /dev/null +++ b/candidate-agreement/src/handle_incoming.rs @@ -0,0 +1,214 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! A stream that handles incoming messages to the BFT agreement module and statement +//! table. It forwards as necessary, and dispatches requests for determining availability +//! and validity of candidates as necessary. + +use std::collections::HashSet; + +use futures::prelude::*; +use futures::stream::{Fuse, FuturesUnordered}; +use futures::sync::mpsc; + +use table::{self, Statement, Context as TableContext}; + +use super::{Context, CheckedMessage, SharedTable, TypeResolve}; + +enum CheckResult { + Available, + Unavailable, + Valid, + Invalid, +} + +enum Checking { + Availability(D, A), + Validity(D, V), +} + +impl Future for Checking + where + D: Clone, + A: Future, + V: Future, +{ + type Item = (D, CheckResult); + type Error = E; + + fn poll(&mut self) -> Poll { + Ok(Async::Ready(match *self { + Checking::Availability(ref digest, ref mut f) => { + match try_ready!(f.poll()) { + true => (digest.clone(), CheckResult::Available), + false => (digest.clone(), CheckResult::Unavailable), + } + } + Checking::Validity(ref digest, ref mut f) => { + match try_ready!(f.poll()) { + true => (digest.clone(), CheckResult::Valid), + false => (digest.clone(), CheckResult::Invalid), + } + } + })) + } +} + +/// Handles incoming messages to the BFT service and statement table. +/// +/// Also triggers requests for determining validity and availability of other +/// parachain candidates. +pub struct HandleIncoming { + table: SharedTable, + messages_in: Fuse, + bft_out: mpsc::UnboundedSender<::BftCommunication>, + local_id: C::AuthorityId, + requesting_about: FuturesUnordered::Future, + ::Future, + >>, + checked_validity: HashSet, + checked_availability: HashSet, +} + +impl HandleIncoming { + fn sign_and_import_statement(&self, digest: C::Digest, result: CheckResult) { + let statement = match result { + CheckResult::Valid => Statement::Valid(digest), + CheckResult::Invalid => Statement::Invalid(digest), + CheckResult::Available => Statement::Available(digest), + CheckResult::Unavailable => return, // no such statement and not provable. + }; + + // TODO: trigger broadcast to peers immediately? + self.table.sign_and_import(statement); + } + + fn import_message(&mut self, origin: C::AuthorityId, message: CheckedMessage) { + match message { + CheckedMessage::Bft(msg) => { let _ = self.bft_out.unbounded_send(msg); } + CheckedMessage::Table(table_messages) => { + // import all table messages and check for any that we + // need to produce statements for. + let msg_iter = table_messages + .into_iter() + .map(|m| (m, Some(origin.clone()))); + let summaries: Vec<_> = self.table.import_statements(msg_iter); + + for summary in summaries { + self.dispatch_on_summary(summary) + } + } + } + } + + // on new candidates in our group, begin checking validity. + // on new candidates in our availability sphere, begin checking availability. + fn dispatch_on_summary(&mut self, summary: table::Summary) { + let is_validity_member = + self.table.context().is_member_of(&self.local_id, &summary.group_id); + + let is_availability_member = + self.table.context().is_availability_guarantor_of(&self.local_id, &summary.group_id); + + let digest = &summary.candidate; + + // TODO: consider a strategy based on the number of candidate votes as well. + let checking_validity = + is_validity_member && + self.checked_validity.insert(digest.clone()) && + self.table.proposed_digest() != Some(digest.clone()); + + let checking_availability = is_availability_member && self.checked_availability.insert(digest.clone()); + + if checking_validity || checking_availability { + let context = &*self.table.context(); + let requesting_about = &mut self.requesting_about; + self.table.with_candidate(digest, |c| match c { + None => {} // TODO: handle table inconsistency somehow? + Some(candidate) => { + if checking_validity { + let future = context.check_validity(candidate).into_future(); + let checking = Checking::Validity(digest.clone(), future); + requesting_about.push(checking); + } + + if checking_availability { + let future = context.check_availability(candidate).into_future(); + let checking = Checking::Availability(digest.clone(), future); + requesting_about.push(checking); + } + } + }) + } + } +} + +impl HandleIncoming + where + C: Context, + I: Stream),Error=E>, + C::CheckAvailability: IntoFuture, + C::CheckCandidate: IntoFuture, +{ + pub fn new( + table: SharedTable, + messages_in: I, + bft_out: mpsc::UnboundedSender<::BftCommunication>, + ) -> Self { + let local_id = table.context().local_id(); + + HandleIncoming { + table, + bft_out, + local_id, + messages_in: messages_in.fuse(), + requesting_about: FuturesUnordered::new(), + checked_validity: HashSet::new(), + checked_availability: HashSet::new(), + } + } +} + +impl Future for HandleIncoming + where + C: Context, + I: Stream),Error=E>, + C::CheckAvailability: IntoFuture, + C::CheckCandidate: IntoFuture, +{ + type Item = (); + type Error = E; + + fn poll(&mut self) -> Poll<(), E> { + loop { + // FuturesUnordered is safe to poll after it has completed. + while let Async::Ready(Some((d, r))) = self.requesting_about.poll()? { + self.sign_and_import_statement(d, r); + } + + match try_ready!(self.messages_in.poll()) { + None => if self.requesting_about.is_empty() { + return Ok(Async::Ready(())) + } else { + return Ok(Async::NotReady) + }, + Some((origin, msg)) => self.import_message(origin, msg), + } + } + } +} diff --git a/candidate-agreement/src/lib.rs b/candidate-agreement/src/lib.rs index 09dd56f5f0874..2cf4be5c54715 100644 --- a/candidate-agreement/src/lib.rs +++ b/candidate-agreement/src/lib.rs @@ -16,21 +16,610 @@ //! Propagation and agreement of candidates. //! -//! Validators are split into groups by parachain, and each validator might come -//! up its own candidate for their parachain. Within groups, validators pass around +//! Authorities are split into groups by parachain, and each authority might come +//! up its own candidate for their parachain. Within groups, authorities pass around //! their candidates and produce statements of validity. //! -//! Any candidate that receives majority approval by the validators in a group -//! may be subject to inclusion, unless any validators flag that candidate as invalid. +//! Any candidate that receives majority approval by the authorities in a group +//! may be subject to inclusion, unless any authorities flag that candidate as invalid. //! //! Wrongly flagging as invalid should be strongly disincentivized, so that in the //! equilibrium state it is not expected to happen. Likewise with the submission //! of invalid blocks. //! -//! Groups themselves may be compromised by malicious validators. +//! Groups themselves may be compromised by malicious authorities. +#[macro_use] extern crate futures; -extern crate polkadot_primitives as primitives; +extern crate parking_lot; +extern crate tokio_timer; -pub mod bft; -pub mod table; +use std::collections::{HashMap, HashSet}; +use std::fmt::Debug; +use std::hash::Hash; +use std::sync::Arc; +use std::time::Duration; + +use futures::prelude::*; +use futures::sync::{mpsc, oneshot}; +use parking_lot::Mutex; +use tokio_timer::Timer; + +use table::Table; + +mod bft; +mod handle_incoming; +mod round_robin; +mod table; + +#[cfg(test)] +pub mod tests; + +/// Context necessary for agreement. +pub trait Context: Send + Clone { + /// A authority ID + type AuthorityId: Debug + Hash + Eq + Clone + Ord; + /// The digest (hash or other unique attribute) of a candidate. + type Digest: Debug + Hash + Eq + Clone; + /// The group ID type + type GroupId: Debug + Hash + Ord + Eq + Clone; + /// A signature type. + type Signature: Debug + Eq + Clone; + /// Candidate type. In practice this will be a candidate receipt. + type ParachainCandidate: Debug + Ord + Eq + Clone; + /// The actual block proposal type. This is what is agreed upon, and + /// is composed of multiple candidates. + type Proposal: Debug + Eq + Clone; + + /// A future that resolves when a candidate is checked for validity. + /// + /// In Polkadot, this will involve fetching the corresponding block data, + /// producing the necessary ingress, and running the parachain validity function. + type CheckCandidate: IntoFuture; + + /// A future that resolves when availability of a candidate's external + /// data is checked. + type CheckAvailability: IntoFuture; + + /// The statement batch type. + type StatementBatch: StatementBatch< + Self::AuthorityId, + table::SignedStatement, + >; + + /// Get the digest of a candidate. + fn candidate_digest(candidate: &Self::ParachainCandidate) -> Self::Digest; + + /// Get the digest of a proposal. + fn proposal_digest(proposal: &Self::Proposal) -> Self::Digest; + + /// Get the group of a candidate. + fn candidate_group(candidate: &Self::ParachainCandidate) -> Self::GroupId; + + /// Get the primary for a given round. + fn round_proposer(&self, round: usize) -> Self::AuthorityId; + + /// Check a candidate for validity. + fn check_validity(&self, candidate: &Self::ParachainCandidate) -> Self::CheckCandidate; + + /// Check availability of candidate data. + fn check_availability(&self, candidate: &Self::ParachainCandidate) -> Self::CheckAvailability; + + /// Attempt to combine a set of parachain candidates into a proposal. + /// + /// This may arbitrarily return `None`, but the intent is for `Some` + /// to only be returned when candidates from enough groups are known. + /// + /// "enough" may be subjective as well. + fn create_proposal(&self, candidates: Vec<&Self::ParachainCandidate>) + -> Option; + + /// Check validity of a proposal. This should call out to the `check_candidate` + /// function for all parachain candidates contained within it, as well as + /// checking other validity constraints of the proposal. + fn proposal_valid(&self, proposal: &Self::Proposal, check_candidate: F) -> bool + where F: FnMut(&Self::ParachainCandidate) -> bool; + + /// Get the local authority ID. + fn local_id(&self) -> Self::AuthorityId; + + /// Sign a table validity statement with the local key. + fn sign_table_statement( + &self, + statement: &table::Statement + ) -> Self::Signature; + + /// Sign a BFT agreement message. + fn sign_bft_message(&self, &bft::Message) -> Self::Signature; +} + +/// Helper for type resolution for contexts until type aliases apply bounds. +pub trait TypeResolve { + type SignedTableStatement; + type BftCommunication; + type BftCommitted; + type Misbehavior; +} + +impl TypeResolve for C { + type SignedTableStatement = table::SignedStatement; + type BftCommunication = bft::Communication; + type BftCommitted = bft::Committed; + type Misbehavior = table::Misbehavior; +} + +/// Information about a specific group. +#[derive(Debug, Clone)] +pub struct GroupInfo { + /// Authorities meant to check validity of candidates. + pub validity_guarantors: HashSet, + /// Authorities meant to check availability of candidate data. + pub availability_guarantors: HashSet, + /// Number of votes needed for validity. + pub needed_validity: usize, + /// Number of votes needed for availability. + pub needed_availability: usize, +} + +struct TableContext { + context: C, + groups: HashMap>, +} + +impl ::std::ops::Deref for TableContext { + type Target = C; + + fn deref(&self) -> &C { + &self.context + } +} + +impl table::Context for TableContext { + type AuthorityId = C::AuthorityId; + type Digest = C::Digest; + type GroupId = C::GroupId; + type Signature = C::Signature; + type Candidate = C::ParachainCandidate; + + fn candidate_digest(candidate: &Self::Candidate) -> Self::Digest { + C::candidate_digest(candidate) + } + + fn candidate_group(candidate: &Self::Candidate) -> Self::GroupId { + C::candidate_group(candidate) + } + + fn is_member_of(&self, authority: &Self::AuthorityId, group: &Self::GroupId) -> bool { + self.groups.get(group).map_or(false, |g| g.validity_guarantors.contains(authority)) + } + + fn is_availability_guarantor_of(&self, authority: &Self::AuthorityId, group: &Self::GroupId) -> bool { + self.groups.get(group).map_or(false, |g| g.availability_guarantors.contains(authority)) + } + + fn requisite_votes(&self, group: &Self::GroupId) -> (usize, usize) { + self.groups.get(group).map_or( + (usize::max_value(), usize::max_value()), + |g| (g.needed_validity, g.needed_availability), + ) + } +} + +// A shared table object. +struct SharedTableInner { + table: Table>, + proposed_digest: Option, + awaiting_proposal: Vec>, +} + +impl SharedTableInner { + fn import_statement( + &mut self, + context: &TableContext, + statement: ::SignedTableStatement, + received_from: Option + ) -> Option> { + self.table.import_statement(context, statement, received_from) + } + + fn update_proposal(&mut self, context: &TableContext) { + if self.awaiting_proposal.is_empty() { return } + let proposal_candidates = self.table.proposed_candidates(context); + if let Some(proposal) = context.context.create_proposal(proposal_candidates) { + for sender in self.awaiting_proposal.drain(..) { + let _ = sender.send(proposal.clone()); + } + } + } + + fn get_proposal(&mut self, context: &TableContext) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + self.awaiting_proposal.push(tx); + self.update_proposal(context); + rx + } + + fn proposal_valid(&mut self, context: &TableContext, proposal: &C::Proposal) -> bool { + context.context.proposal_valid(proposal, |contained_candidate| { + // check that the candidate is valid (has enough votes) + let digest = C::candidate_digest(contained_candidate); + self.table.candidate_includable(&digest, context) + }) + } +} + +/// A shared table object. +pub struct SharedTable { + context: Arc>, + inner: Arc>>, +} + +impl Clone for SharedTable { + fn clone(&self) -> Self { + SharedTable { + context: self.context.clone(), + inner: self.inner.clone() + } + } +} + +impl SharedTable { + /// Create a new shared table. + pub fn new(context: C, groups: HashMap>) -> Self { + SharedTable { + context: Arc::new(TableContext { context, groups }), + inner: Arc::new(Mutex::new(SharedTableInner { + table: Table::default(), + awaiting_proposal: Vec::new(), + proposed_digest: None, + })) + } + } + + /// Import a single statement. + pub fn import_statement( + &self, + statement: ::SignedTableStatement, + received_from: Option, + ) -> Option> { + self.inner.lock().import_statement(&*self.context, statement, received_from) + } + + /// Sign and import a local statement. + pub fn sign_and_import( + &self, + statement: table::Statement, + ) -> Option> { + let proposed_digest = match statement { + table::Statement::Candidate(ref c) => Some(C::candidate_digest(c)), + _ => None, + }; + + let signed_statement = table::SignedStatement { + signature: self.context.sign_table_statement(&statement), + sender: self.context.local_id(), + statement, + }; + + let mut inner = self.inner.lock(); + if proposed_digest.is_some() { + inner.proposed_digest = proposed_digest; + } + + inner.import_statement(&*self.context, signed_statement, None) + } + + /// Import many statements at once. + /// + /// Provide an iterator yielding pairs of (statement, received_from). + pub fn import_statements(&self, iterable: I) -> U + where + I: IntoIterator::SignedTableStatement, Option)>, + U: ::std::iter::FromIterator>, + { + let mut inner = self.inner.lock(); + + iterable.into_iter().filter_map(move |(statement, received_from)| { + inner.import_statement(&*self.context, statement, received_from) + }).collect() + } + + /// Update the proposal sealing. + pub fn update_proposal(&self) { + self.inner.lock().update_proposal(&*self.context) + } + + /// Register interest in receiving a proposal when ready. + /// If one is ready immediately, it will be provided. + pub fn get_proposal(&self) -> oneshot::Receiver { + self.inner.lock().get_proposal(&*self.context) + } + + /// Check if a proposal is valid. + pub fn proposal_valid(&self, proposal: &C::Proposal) -> bool { + self.inner.lock().proposal_valid(&*self.context, proposal) + } + + /// Execute a closure using a specific candidate. + /// + /// Deadlocks if called recursively. + pub fn with_candidate(&self, digest: &C::Digest, f: F) -> U + where F: FnOnce(Option<&C::ParachainCandidate>) -> U + { + let inner = self.inner.lock(); + f(inner.table.get_candidate(digest)) + } + + /// Get all witnessed misbehavior. + pub fn get_misbehavior(&self) -> HashMap::Misbehavior> { + self.inner.lock().table.get_misbehavior().clone() + } + + /// Fill a statement batch. + pub fn fill_batch(&self, batch: &mut C::StatementBatch) { + self.inner.lock().table.fill_batch(batch); + } + + /// Get the local proposed candidate digest. + pub fn proposed_digest(&self) -> Option { + self.inner.lock().proposed_digest.clone() + } + + // Get a handle to the table context. + fn context(&self) -> &TableContext { + &*self.context + } +} + +/// Errors that can occur during agreement. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum Error { + IoTerminated, + FaultyTimer, + CannotPropose, +} + +impl From for Error { + fn from(_: bft::InputStreamConcluded) -> Error { + Error::IoTerminated + } +} + +/// Context owned by the BFT future necessary to execute the logic. +pub struct BftContext { + context: C, + table: SharedTable, + timer: Timer, + round_timeout_multiplier: u64, +} + +impl bft::Context for BftContext + where C::Proposal: 'static, +{ + type AuthorityId = C::AuthorityId; + type Digest = C::Digest; + type Signature = C::Signature; + type Candidate = C::Proposal; + type RoundTimeout = Box>; + type CreateProposal = Box>; + + fn local_id(&self) -> Self::AuthorityId { + self.context.local_id() + } + + fn proposal(&self) -> Self::CreateProposal { + Box::new(self.table.get_proposal().map_err(|_| Error::CannotPropose)) + } + + fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest { + C::proposal_digest(candidate) + } + + fn sign_local(&self, message: bft::Message) + -> bft::LocalizedMessage + { + let sender = self.local_id(); + let signature = self.context.sign_bft_message(&message); + bft::LocalizedMessage { + message, + sender, + signature, + } + } + + fn round_proposer(&self, round: usize) -> Self::AuthorityId { + self.context.round_proposer(round) + } + + fn candidate_valid(&self, proposal: &Self::Candidate) -> bool { + self.table.proposal_valid(proposal) + } + + fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout { + let round = ::std::cmp::min(63, round) as u32; + let timeout = 1u64.checked_shl(round) + .unwrap_or_else(u64::max_value) + .saturating_mul(self.round_timeout_multiplier); + + Box::new(self.timer.sleep(Duration::from_secs(timeout)) + .map_err(|_| Error::FaultyTimer)) + } +} + + +/// Parameters necessary for agreement. +pub struct AgreementParams { + /// The context itself. + pub context: C, + /// For scheduling timeouts. + pub timer: Timer, + /// The statement table. + pub table: SharedTable, + /// The number of nodes. + pub nodes: usize, + /// The maximum number of faulty nodes. + pub max_faulty: usize, + /// The round timeout multiplier: 2^round_number is multiplied by this. + pub round_timeout_multiplier: u64, + /// The maximum amount of messages to queue. + pub message_buffer_size: usize, + /// Interval to attempt forming proposals over. + pub form_proposal_interval: Duration, +} + +/// Recovery for messages +pub trait MessageRecovery { + /// The unchecked message type. This implies that work hasn't been done + /// to decode the payload and check and authenticate a signature. + type UncheckedMessage; + + /// Attempt to transform a checked message into an unchecked. + fn check_message(&self, Self::UncheckedMessage) -> Option>; +} + +/// A batch of statements to send out. +pub trait StatementBatch { + /// Get the target authorities of these statements. + fn targets(&self) -> &[V]; + + /// If the batch is empty. + fn is_empty(&self) -> bool; + + /// Push a statement onto the batch. Returns false when the batch is full. + /// + /// This is meant to do work like incrementally serializing the statements + /// into a vector of bytes while making sure the length is below a certain + /// amount. + fn push(&mut self, statement: T) -> bool; +} + +/// Recovered and fully checked messages. +pub enum CheckedMessage { + /// Messages meant for the BFT agreement logic. + Bft(::BftCommunication), + /// Statements circulating about the table. + Table(Vec<::SignedTableStatement>), +} + +/// Outgoing messages to the network. +#[derive(Debug, Clone)] +pub enum OutgoingMessage { + /// Messages meant for BFT agreement peers. + Bft(::BftCommunication), + /// Batches of table statements. + Table(C::StatementBatch), +} + +/// Create an agreement future, and I/O streams. +// TODO: kill 'static bounds and use impl Future. +pub fn agree< + Context, + NetIn, + NetOut, + Recovery, + PropagateStatements, + LocalCandidate, + Err, +>( + params: AgreementParams, + net_in: NetIn, + net_out: NetOut, + recovery: Recovery, + propagate_statements: PropagateStatements, + local_candidate: LocalCandidate, +) + -> Box::BftCommitted,Error=Error>> + where + Context: ::Context + 'static, + Context::CheckCandidate: IntoFuture, + Context::CheckAvailability: IntoFuture, + NetIn: Stream),Error=Err> + 'static, + NetOut: Sink> + 'static, + Recovery: MessageRecovery + 'static, + PropagateStatements: Stream + 'static, + LocalCandidate: IntoFuture + 'static +{ + let (bft_in_in, bft_in_out) = mpsc::unbounded(); + let (bft_out_in, bft_out_out) = mpsc::unbounded(); + + let agreement = { + let bft_context = BftContext { + context: params.context, + table: params.table.clone(), + timer: params.timer.clone(), + round_timeout_multiplier: params.round_timeout_multiplier, + }; + + bft::agree( + bft_context, + params.nodes, + params.max_faulty, + bft_in_out.map(bft::ContextCommunication).map_err(|_| Error::IoTerminated), + bft_out_in.sink_map_err(|_| Error::IoTerminated), + ) + }; + + let route_messages_in = { + let round_robin = round_robin::RoundRobinBuffer::new(net_in, params.message_buffer_size); + + let round_robin_recovered = round_robin + .filter_map(move |(sender, msg)| recovery.check_message(msg).map(move |x| (sender, x))); + + handle_incoming::HandleIncoming::new( + params.table.clone(), + round_robin_recovered, + bft_in_in, + ).map_err(|_| Error::IoTerminated) + }; + + let route_messages_out = { + let table = params.table.clone(); + let periodic_table_statements = propagate_statements + .or_else(|_| ::futures::future::empty()) // halt the stream instead of error. + .map(move |mut batch| { table.fill_batch(&mut batch); batch }) + .filter(|b| !b.is_empty()) + .map(OutgoingMessage::Table); + + let complete_out_stream = bft_out_out + .map_err(|_| Error::IoTerminated) + .map(|bft::ContextCommunication(x)| x) + .map(OutgoingMessage::Bft) + .select(periodic_table_statements); + + net_out.sink_map_err(|_| Error::IoTerminated).send_all(complete_out_stream) + }; + + let import_local_candidate = { + let table = params.table.clone(); + local_candidate + .into_future() + .map(table::Statement::Candidate) + .map(Some) + .or_else(|_| Ok(None)) + .map(move |s| if let Some(s) = s { + table.sign_and_import(s); + }) + }; + + let create_proposal_on_interval = { + let table = params.table; + params.timer.interval(params.form_proposal_interval) + .map_err(|_| Error::FaultyTimer) + .for_each(move |_| { table.update_proposal(); Ok(()) }) + }; + + // if these auxiliary futures terminate before the agreement, then + // that is an error. + let auxiliary_futures = route_messages_in.join4( + create_proposal_on_interval, + route_messages_out, + import_local_candidate, + ).and_then(|_| Err(Error::IoTerminated)); + + let future = agreement + .select(auxiliary_futures) + .map(|(committed, _)| committed) + .map_err(|(e, _)| e); + + Box::new(future) +} diff --git a/candidate-agreement/src/round_robin.rs b/candidate-agreement/src/round_robin.rs new file mode 100644 index 0000000000000..3f98507cab89d --- /dev/null +++ b/candidate-agreement/src/round_robin.rs @@ -0,0 +1,164 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Round-robin buffer for incoming messages. +//! +//! This takes batches of messages associated with a sender as input, +//! and yields messages in a fair order by sender. + +use std::collections::{Bound, BTreeMap, VecDeque}; + +use futures::prelude::*; +use futures::stream::Fuse; + +/// Implementation of the round-robin buffer for incoming messages. +#[derive(Debug)] +pub struct RoundRobinBuffer { + buffer: BTreeMap>, + last_processed_from: Option, + stored_messages: usize, + max_messages: usize, + inner: Fuse, +} + +impl RoundRobinBuffer { + /// Create a new round-robin buffer which holds up to a maximum + /// amount of messages. + pub fn new(stream: S, buffer_size: usize) -> Self { + RoundRobinBuffer { + buffer: BTreeMap::new(), + last_processed_from: None, + stored_messages: 0, + max_messages: buffer_size, + inner: stream.fuse(), + } + } +} + +impl RoundRobinBuffer { + fn next_message(&mut self) -> Option<(V, M)> { + if self.stored_messages == 0 { + return None + } + + // first pick up from the last authority we processed a message from + let mut next = { + let lower_bound = match self.last_processed_from { + None => Bound::Unbounded, + Some(ref x) => Bound::Excluded(x.clone()), + }; + + self.buffer.range_mut((lower_bound, Bound::Unbounded)) + .filter_map(|(k, v)| v.pop_front().map(|v| (k.clone(), v))) + .next() + }; + + // but wrap around to the beginning again if we got nothing. + if next.is_none() { + next = self.buffer.iter_mut() + .filter_map(|(k, v)| v.pop_front().map(|v| (k.clone(), v))) + .next(); + } + + if let Some((ref authority, _)) = next { + self.stored_messages -= 1; + self.last_processed_from = Some(authority.clone()); + } + + next + } + + // import messages, discarding when the buffer is full. + fn import_messages(&mut self, sender: V, messages: Vec) { + let space_remaining = self.max_messages - self.stored_messages; + self.stored_messages += ::std::cmp::min(space_remaining, messages.len()); + + let v = self.buffer.entry(sender).or_insert_with(VecDeque::new); + v.extend(messages.into_iter().take(space_remaining)); + } +} + +impl Stream for RoundRobinBuffer + where S: Stream)> +{ + type Item = (V, M); + type Error = S::Error; + + fn poll(&mut self) -> Poll, S::Error> { + loop { + match self.inner.poll()? { + Async::NotReady | Async::Ready(None) => break, + Async::Ready(Some((sender, msgs))) => self.import_messages(sender, msgs), + } + } + + let done = self.inner.is_done(); + Ok(match self.next_message() { + Some(msg) => Async::Ready(Some(msg)), + None => if done { Async::Ready(None) } else { Async::NotReady }, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::stream::{self, Stream}; + + #[derive(Debug, PartialEq, Eq)] + struct UncheckedMessage { data: Vec } + + #[test] + fn is_fair_and_wraps_around() { + let stream = stream::iter_ok(vec![ + (1, vec![ + UncheckedMessage { data: vec![1, 3, 5] }, + UncheckedMessage { data: vec![3, 5, 7] }, + UncheckedMessage { data: vec![5, 7, 9] }, + ]), + (2, vec![ + UncheckedMessage { data: vec![2, 4, 6] }, + UncheckedMessage { data: vec![4, 6, 8] }, + UncheckedMessage { data: vec![6, 8, 10] }, + ]), + ]); + + let round_robin = RoundRobinBuffer::new(stream, 100); + let output = round_robin.wait().collect::, ()>>().unwrap(); + + assert_eq!(output, vec![ + (1, UncheckedMessage { data: vec![1, 3, 5] }), + (2, UncheckedMessage { data: vec![2, 4, 6] }), + (1, UncheckedMessage { data: vec![3, 5, 7] }), + + (2, UncheckedMessage { data: vec![4, 6, 8] }), + (1, UncheckedMessage { data: vec![5, 7, 9] }), + (2, UncheckedMessage { data: vec![6, 8, 10] }), + ]); + } + + #[test] + fn discards_when_full() { + let stream = stream::iter_ok(vec![ + (1, (0..200).map(|i| UncheckedMessage { data: vec![i] }).collect()) + ]); + + let round_robin = RoundRobinBuffer::new(stream, 100); + let output = round_robin.wait().collect::, ()>>().unwrap(); + + assert_eq!(output.len(), 100); + } +} diff --git a/candidate-agreement/src/table.rs b/candidate-agreement/src/table.rs index 381244e58b6d6..2909d219c6fbb 100644 --- a/candidate-agreement/src/table.rs +++ b/candidate-agreement/src/table.rs @@ -16,14 +16,14 @@ //! The statement table. //! -//! This stores messages other validators issue about candidates. +//! This stores messages other authorities issue about candidates. //! //! These messages are used to create a proposal submitted to a BFT consensus process. //! //! Proposals are formed of sets of candidates which have the requisite number of //! validity and availability votes. //! -//! Each parachain is associated with two sets of validators: those which can +//! Each parachain is associated with two sets of authorities: those which can //! propose and attest to validity of candidates, and those who can only attest //! to availability. @@ -32,35 +32,37 @@ use std::collections::hash_map::{HashMap, Entry}; use std::hash::Hash; use std::fmt::Debug; +use super::StatementBatch; + /// Context for the statement table. pub trait Context { - /// A validator ID - type ValidatorId: Hash + Eq + Clone + Debug; + /// A authority ID + type AuthorityId: Debug + Hash + Eq + Clone; /// The digest (hash or other unique attribute) of a candidate. - type Digest: Hash + Eq + Clone + Debug; - /// Candidate type. - type Candidate: Ord + Eq + Clone + Debug; + type Digest: Debug + Hash + Eq + Clone; /// The group ID type - type GroupId: Hash + Ord + Eq + Clone + Debug; + type GroupId: Debug + Hash + Ord + Eq + Clone; /// A signature type. - type Signature: Eq + Clone + Debug; + type Signature: Debug + Eq + Clone; + /// Candidate type. In practice this will be a candidate receipt. + type Candidate: Debug + Ord + Eq + Clone; /// get the digest of a candidate. - fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest; + fn candidate_digest(candidate: &Self::Candidate) -> Self::Digest; /// get the group of a candidate. - fn candidate_group(&self, candidate: &Self::Candidate) -> Self::GroupId; + fn candidate_group(candidate: &Self::Candidate) -> Self::GroupId; - /// Whether a validator is a member of a group. + /// Whether a authority is a member of a group. /// Members are meant to submit candidates and vote on validity. - fn is_member_of(&self, validator: &Self::ValidatorId, group: &Self::GroupId) -> bool; + fn is_member_of(&self, authority: &Self::AuthorityId, group: &Self::GroupId) -> bool; - /// Whether a validator is an availability guarantor of a group. + /// Whether a authority is an availability guarantor of a group. /// Guarantors are meant to vote on availability for candidates submitted /// in a group. fn is_availability_guarantor_of( &self, - validator: &Self::ValidatorId, + authority: &Self::AuthorityId, group: &Self::GroupId, ) -> bool; @@ -69,26 +71,26 @@ pub trait Context { } /// Statements circulated among peers. -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub enum Statement { - /// Broadcast by a validator to indicate that this is his candidate for + /// Broadcast by a authority to indicate that this is his candidate for /// inclusion. /// /// Broadcasting two different candidate messages per round is not allowed. Candidate(C), - /// Broadcast by a validator to attest that the candidate with given digest + /// Broadcast by a authority to attest that the candidate with given digest /// is valid. Valid(D), - /// Broadcast by a validator to attest that the auxiliary data for a candidate + /// Broadcast by a authority to attest that the auxiliary data for a candidate /// with given digest is available. Available(D), - /// Broadcast by a validator to attest that the candidate with given digest + /// Broadcast by a authority to attest that the candidate with given digest /// is invalid. Invalid(D), } /// A signed statement. -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub struct SignedStatement { /// The statement. pub statement: Statement, @@ -98,23 +100,23 @@ pub struct SignedStatement { pub sender: V, } -// A unique trace for a class of valid statements issued by a validator. +// A unique trace for a class of valid statements issued by a authority. // -// We keep track of which statements we have received or sent to other validators +// We keep track of which statements we have received or sent to other authorities // in order to prevent relaying the same data multiple times. // -// The signature of the statement is replaced by the validator because the validator +// The signature of the statement is replaced by the authority because the authority // is unique while signatures are not (at least under common schemes like // Schnorr or ECDSA). #[derive(Hash, PartialEq, Eq, Clone)] enum StatementTrace { - /// The candidate proposed by the validator. + /// The candidate proposed by the authority. Candidate(V), - /// A validity statement from that validator about the given digest. + /// A validity statement from that authority about the given digest. Valid(V, D), - /// An invalidity statement from that validator about the given digest. + /// An invalidity statement from that authority about the given digest. Invalid(V, D), - /// An availability statement from that validator about the given digest. + /// An availability statement from that authority about the given digest. Available(V, D), } @@ -122,7 +124,7 @@ enum StatementTrace { /// /// Since there are three possible ways to vote, a double vote is possible in /// three possible combinations (unordered) -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub enum ValidityDoubleVote { /// Implicit vote by issuing and explicity voting validity. IssuedAndValidity((C, S), (D, S)), @@ -133,7 +135,7 @@ pub enum ValidityDoubleVote { } /// Misbehavior: declaring multiple candidates. -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub struct MultipleCandidates { /// The first candidate seen. pub first: (C, S), @@ -142,7 +144,7 @@ pub struct MultipleCandidates { } /// Misbehavior: submitted statement for wrong group. -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub struct UnauthorizedStatement { /// A signed statement which was submitted without proper authority. pub statement: SignedStatement, @@ -150,7 +152,7 @@ pub struct UnauthorizedStatement { /// Different kinds of misbehavior. All of these kinds of malicious misbehavior /// are easily provable and extremely disincentivized. -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub enum Misbehavior { /// Voted invalid and valid on validity. ValidityDoubleVote(ValidityDoubleVote), @@ -168,7 +170,7 @@ pub trait ResolveMisbehavior { } impl ResolveMisbehavior for C { - type Misbehavior = Misbehavior; + type Misbehavior = Misbehavior; } // kinds of votes for validity @@ -201,9 +203,9 @@ pub struct Summary { pub struct CandidateData { group_id: C::GroupId, candidate: C::Candidate, - validity_votes: HashMap>, - availability_votes: HashMap, - indicated_bad_by: Vec, + validity_votes: HashMap>, + availability_votes: HashMap, + indicated_bad_by: Vec, } impl CandidateData { @@ -212,20 +214,9 @@ impl CandidateData { !self.indicated_bad_by.is_empty() } - /// Get an iterator over those who have indicated this candidate valid. - // TODO: impl trait - pub fn voted_valid_by<'a>(&'a self) -> Box + 'a> { - Box::new(self.validity_votes.iter().filter_map(|(v, vote)| { - match *vote { - ValidityVote::Issued(_) | ValidityVote::Valid(_) => Some(v.clone()), - ValidityVote::Invalid(_) => None, - } - })) - } - // Candidate data can be included in a proposal // if it has enough validity and availability votes - // and no validators have called it bad. + // and no authorities have called it bad. fn can_be_included(&self, validity_threshold: usize, availability_threshold: usize) -> bool { self.indicated_bad_by.is_empty() && self.validity_votes.len() >= validity_threshold @@ -243,35 +234,46 @@ impl CandidateData { } } -// validator metadata -struct ValidatorData { +// authority metadata +struct AuthorityData { proposal: Option<(C::Digest, C::Signature)>, - known_statements: HashSet>, + known_statements: HashSet>, } -/// Create a new, empty statement table. -pub fn create() -> Table { - Table { - validator_data: HashMap::default(), - detected_misbehavior: HashMap::default(), - candidate_votes: HashMap::default(), +impl Default for AuthorityData { + fn default() -> Self { + AuthorityData { + proposal: None, + known_statements: HashSet::default(), + } } } /// Stores votes -#[derive(Default)] pub struct Table { - validator_data: HashMap>, - detected_misbehavior: HashMap::Misbehavior>, + authority_data: HashMap>, + detected_misbehavior: HashMap::Misbehavior>, candidate_votes: HashMap>, } +impl Default for Table { + fn default() -> Self { + Table { + authority_data: HashMap::new(), + detected_misbehavior: HashMap::new(), + candidate_votes: HashMap::new(), + } + } +} + impl Table { /// Produce a set of proposed candidates. /// /// This will be at most one per group, consisting of the /// best candidate for each group with requisite votes for inclusion. - pub fn proposed_candidates(&self, context: &C) -> Vec { + /// + /// The vector is sorted in ascending order by group id. + pub fn proposed_candidates<'a>(&'a self, context: &C) -> Vec<&'a C::Candidate> { use std::collections::BTreeMap; use std::collections::btree_map::Entry as BTreeEntry; @@ -285,7 +287,7 @@ impl Table { match best_candidates.entry(group_id.clone()) { BTreeEntry::Occupied(mut occ) => { let candidate_ref = occ.get_mut(); - if *candidate_ref < candidate { + if *candidate_ref > candidate { *candidate_ref = candidate; } } @@ -293,32 +295,27 @@ impl Table { } } - best_candidates.values().map(|v| C::Candidate::clone(v)).collect::>() + best_candidates.values().cloned().collect::>() } - /// Get an iterator of all candidates with a given group. - // TODO: impl iterator - pub fn candidates_in_group<'a>(&'a self, group_id: C::GroupId) - -> Box> + 'a> - { - Box::new(self.candidate_votes.values().filter(move |c| c.group_id == group_id)) - } - - /// Drain all misbehavior observed up to this point. - pub fn drain_misbehavior(&mut self) -> HashMap::Misbehavior> { - ::std::mem::replace(&mut self.detected_misbehavior, HashMap::new()) + /// Whether a candidate can be included. + pub fn candidate_includable(&self, digest: &C::Digest, context: &C) -> bool { + self.candidate_votes.get(digest).map_or(false, |data| { + let (v_threshold, a_threshold) = context.requisite_votes(&data.group_id); + data.can_be_included(v_threshold, a_threshold) + }) } /// Import a signed statement. Signatures should be checked for validity, and the - /// sender should be checked to actually be a validator. + /// sender should be checked to actually be a authority. /// /// This can note the origin of the statement to indicate that he has /// seen it already. pub fn import_statement( &mut self, context: &C, - statement: SignedStatement, - from: Option + statement: SignedStatement, + from: Option ) -> Option> { let SignedStatement { statement, signature, sender: signer } = statement; @@ -371,8 +368,143 @@ impl Table { maybe_summary } - fn note_trace_seen(&mut self, trace: StatementTrace, known_by: C::ValidatorId) { - self.validator_data.entry(known_by).or_insert_with(|| ValidatorData { + /// Get a candidate by digest. + pub fn get_candidate(&self, digest: &C::Digest) -> Option<&C::Candidate> { + self.candidate_votes.get(digest).map(|d| &d.candidate) + } + + /// Access all witnessed misbehavior. + pub fn get_misbehavior(&self) + -> &HashMap::Misbehavior> + { + &self.detected_misbehavior + } + + /// Fill a statement batch and note messages seen by the targets. + pub fn fill_batch(&mut self, batch: &mut B) + where B: StatementBatch< + C::AuthorityId, + SignedStatement, + > + { + // naively iterate all statements so far, taking any that + // at least one of the targets has not seen. + + // workaround for the fact that it's inconvenient to borrow multiple + // entries out of a hashmap mutably -- we just move them out and + // replace them when we're done. + struct SwappedTargetData<'a, C: 'a + Context> { + authority_data: &'a mut HashMap>, + target_data: Vec<(C::AuthorityId, AuthorityData)>, + } + + impl<'a, C: 'a + Context> Drop for SwappedTargetData<'a, C> { + fn drop(&mut self) { + for (id, data) in self.target_data.drain(..) { + self.authority_data.insert(id, data); + } + } + } + + // pre-fetch authority data for all the targets. + let mut target_data = { + let authority_data = &mut self.authority_data; + let mut target_data = Vec::with_capacity(batch.targets().len()); + for target in batch.targets() { + let active_data = match authority_data.get_mut(target) { + None => Default::default(), + Some(x) => ::std::mem::replace(x, Default::default()), + }; + + target_data.push((target.clone(), active_data)); + } + + SwappedTargetData { + authority_data, + target_data + } + }; + + let target_data = &mut target_data.target_data; + + macro_rules! attempt_send { + ($trace:expr, sender=$sender:expr, sig=$sig:expr, statement=$statement:expr) => {{ + let trace = $trace; + let can_send = target_data.iter() + .any(|t| !t.1.known_statements.contains(&trace)); + + if can_send { + let statement = SignedStatement { + statement: $statement, + signature: $sig, + sender: $sender, + }; + + if batch.push(statement) { + for target in target_data.iter_mut() { + target.1.known_statements.insert(trace.clone()); + } + } else { + return; + } + } + }} + } + + // reconstruct statements for anything whose trace passes the filter. + for (digest, candidate) in self.candidate_votes.iter() { + let issuance_iter = candidate.validity_votes.iter() + .filter(|&(_, x)| if let ValidityVote::Issued(_) = *x { true } else { false }); + + let validity_iter = candidate.validity_votes.iter() + .filter(|&(_, x)| if let ValidityVote::Issued(_) = *x { false } else { true }); + + // send issuance statements before votes. + for (sender, vote) in issuance_iter.chain(validity_iter) { + match *vote { + ValidityVote::Issued(ref sig) => { + attempt_send!( + StatementTrace::Candidate(sender.clone()), + sender = sender.clone(), + sig = sig.clone(), + statement = Statement::Candidate(candidate.candidate.clone()) + ) + } + ValidityVote::Valid(ref sig) => { + attempt_send!( + StatementTrace::Valid(sender.clone(), digest.clone()), + sender = sender.clone(), + sig = sig.clone(), + statement = Statement::Valid(digest.clone()) + ) + } + ValidityVote::Invalid(ref sig) => { + attempt_send!( + StatementTrace::Invalid(sender.clone(), digest.clone()), + sender = sender.clone(), + sig = sig.clone(), + statement = Statement::Invalid(digest.clone()) + ) + } + } + }; + + + // and lastly send availability. + for (sender, sig) in candidate.availability_votes.iter() { + attempt_send!( + StatementTrace::Available(sender.clone(), digest.clone()), + sender = sender.clone(), + sig = sig.clone(), + statement = Statement::Available(digest.clone()) + ) + } + } + + } + + fn note_trace_seen(&mut self, trace: StatementTrace, known_by: C::AuthorityId) { + self.authority_data.entry(known_by).or_insert_with(|| AuthorityData { proposal: None, known_statements: HashSet::default(), }).known_statements.insert(trace); @@ -381,11 +513,11 @@ impl Table { fn import_candidate( &mut self, context: &C, - from: C::ValidatorId, + from: C::AuthorityId, candidate: C::Candidate, signature: C::Signature, ) -> (Option<::Misbehavior>, Option>) { - let group = context.candidate_group(&candidate); + let group = C::candidate_group(&candidate); if !context.is_member_of(&from, &group) { return ( Some(Misbehavior::UnauthorizedStatement(UnauthorizedStatement { @@ -399,10 +531,10 @@ impl Table { ); } - // check that validator hasn't already specified another candidate. - let digest = context.candidate_digest(&candidate); + // check that authority hasn't already specified another candidate. + let digest = C::candidate_digest(&candidate); - let new_proposal = match self.validator_data.entry(from.clone()) { + let new_proposal = match self.authority_data.entry(from.clone()) { Entry::Occupied(mut occ) => { // if digest is different, fetch candidate and // note misbehavior. @@ -411,7 +543,7 @@ impl Table { if let Some((ref old_digest, ref old_sig)) = existing.proposal { if old_digest != &digest { const EXISTENCE_PROOF: &str = - "when proposal first received from validator, candidate \ + "when proposal first received from authority, candidate \ votes entry is created. proposal here is `Some`, therefore \ candidate votes entry exists; qed"; @@ -436,7 +568,7 @@ impl Table { } } Entry::Vacant(vacant) => { - vacant.insert(ValidatorData { + vacant.insert(AuthorityData { proposal: Some((digest.clone(), signature.clone())), known_statements: HashSet::new(), }); @@ -467,7 +599,7 @@ impl Table { fn validity_vote( &mut self, context: &C, - from: C::ValidatorId, + from: C::AuthorityId, digest: C::Digest, vote: ValidityVote, ) -> (Option<::Misbehavior>, Option>) { @@ -476,7 +608,7 @@ impl Table { Some(votes) => votes, }; - // check that this validator actually can vote in this group. + // check that this authority actually can vote in this group. if !context.is_member_of(&from, &votes.group_id) { let (sig, valid) = match vote { ValidityVote::Valid(s) => (s, true), @@ -546,7 +678,7 @@ impl Table { fn availability_vote( &mut self, context: &C, - from: C::ValidatorId, + from: C::AuthorityId, digest: C::Digest, signature: C::Signature, ) -> (Option<::Misbehavior>, Option>) { @@ -555,7 +687,7 @@ impl Table { Some(votes) => votes, }; - // check that this validator actually can vote in this group. + // check that this authority actually can vote in this group. if !context.is_availability_guarantor_of(&from, &votes.group_id) { return ( Some(Misbehavior::UnauthorizedStatement(UnauthorizedStatement { @@ -577,10 +709,19 @@ impl Table { #[cfg(test)] mod tests { use super::*; + use ::tests::VecBatch; use std::collections::HashMap; + fn create() -> Table { + Table { + authority_data: HashMap::default(), + detected_misbehavior: HashMap::default(), + candidate_votes: HashMap::default(), + } + } + #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] - struct ValidatorId(usize); + struct AuthorityId(usize); #[derive(Debug, Copy, Clone, Hash, PartialOrd, Ord, PartialEq, Eq)] struct GroupId(usize); @@ -598,38 +739,38 @@ mod tests { #[derive(Debug, PartialEq, Eq)] struct TestContext { // v -> (validity, availability) - validators: HashMap + authorities: HashMap } impl Context for TestContext { - type ValidatorId = ValidatorId; + type AuthorityId = AuthorityId; type Digest = Digest; type Candidate = Candidate; type GroupId = GroupId; type Signature = Signature; - fn candidate_digest(&self, candidate: &Candidate) -> Digest { + fn candidate_digest(candidate: &Candidate) -> Digest { Digest(candidate.1) } - fn candidate_group(&self, candidate: &Candidate) -> GroupId { + fn candidate_group(candidate: &Candidate) -> GroupId { GroupId(candidate.0) } fn is_member_of( &self, - validator: &ValidatorId, + authority: &AuthorityId, group: &GroupId ) -> bool { - self.validators.get(validator).map(|v| &v.0 == group).unwrap_or(false) + self.authorities.get(authority).map(|v| &v.0 == group).unwrap_or(false) } fn is_availability_guarantor_of( &self, - validator: &ValidatorId, + authority: &AuthorityId, group: &GroupId ) -> bool { - self.validators.get(validator).map(|v| &v.1 == group).unwrap_or(false) + self.authorities.get(authority).map(|v| &v.1 == group).unwrap_or(false) } fn requisite_votes(&self, _id: &GroupId) -> (usize, usize) { @@ -640,9 +781,9 @@ mod tests { #[test] fn submitting_two_candidates_is_misbehavior() { let context = TestContext { - validators: { + authorities: { let mut map = HashMap::new(); - map.insert(ValidatorId(1), (GroupId(2), GroupId(455))); + map.insert(AuthorityId(1), (GroupId(2), GroupId(455))); map } }; @@ -651,21 +792,21 @@ mod tests { let statement_a = SignedStatement { statement: Statement::Candidate(Candidate(2, 100)), signature: Signature(1), - sender: ValidatorId(1), + sender: AuthorityId(1), }; let statement_b = SignedStatement { statement: Statement::Candidate(Candidate(2, 999)), signature: Signature(1), - sender: ValidatorId(1), + sender: AuthorityId(1), }; table.import_statement(&context, statement_a, None); - assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1))); + assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1))); table.import_statement(&context, statement_b, None); assert_eq!( - table.detected_misbehavior.get(&ValidatorId(1)).unwrap(), + table.detected_misbehavior.get(&AuthorityId(1)).unwrap(), &Misbehavior::MultipleCandidates(MultipleCandidates { first: (Candidate(2, 100), Signature(1)), second: (Candidate(2, 999), Signature(1)), @@ -676,9 +817,9 @@ mod tests { #[test] fn submitting_candidate_from_wrong_group_is_misbehavior() { let context = TestContext { - validators: { + authorities: { let mut map = HashMap::new(); - map.insert(ValidatorId(1), (GroupId(3), GroupId(455))); + map.insert(AuthorityId(1), (GroupId(3), GroupId(455))); map } }; @@ -687,18 +828,18 @@ mod tests { let statement = SignedStatement { statement: Statement::Candidate(Candidate(2, 100)), signature: Signature(1), - sender: ValidatorId(1), + sender: AuthorityId(1), }; table.import_statement(&context, statement, None); assert_eq!( - table.detected_misbehavior.get(&ValidatorId(1)).unwrap(), + table.detected_misbehavior.get(&AuthorityId(1)).unwrap(), &Misbehavior::UnauthorizedStatement(UnauthorizedStatement { statement: SignedStatement { statement: Statement::Candidate(Candidate(2, 100)), signature: Signature(1), - sender: ValidatorId(1), + sender: AuthorityId(1), }, }) ); @@ -707,10 +848,10 @@ mod tests { #[test] fn unauthorized_votes() { let context = TestContext { - validators: { + authorities: { let mut map = HashMap::new(); - map.insert(ValidatorId(1), (GroupId(2), GroupId(455))); - map.insert(ValidatorId(2), (GroupId(3), GroupId(222))); + map.insert(AuthorityId(1), (GroupId(2), GroupId(455))); + map.insert(AuthorityId(2), (GroupId(3), GroupId(222))); map } }; @@ -720,56 +861,56 @@ mod tests { let candidate_a = SignedStatement { statement: Statement::Candidate(Candidate(2, 100)), signature: Signature(1), - sender: ValidatorId(1), + sender: AuthorityId(1), }; let candidate_a_digest = Digest(100); let candidate_b = SignedStatement { statement: Statement::Candidate(Candidate(3, 987)), signature: Signature(2), - sender: ValidatorId(2), + sender: AuthorityId(2), }; let candidate_b_digest = Digest(987); table.import_statement(&context, candidate_a, None); table.import_statement(&context, candidate_b, None); - assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1))); - assert!(!table.detected_misbehavior.contains_key(&ValidatorId(2))); + assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1))); + assert!(!table.detected_misbehavior.contains_key(&AuthorityId(2))); - // validator 1 votes for availability on 2's candidate. + // authority 1 votes for availability on 2's candidate. let bad_availability_vote = SignedStatement { statement: Statement::Available(candidate_b_digest.clone()), signature: Signature(1), - sender: ValidatorId(1), + sender: AuthorityId(1), }; table.import_statement(&context, bad_availability_vote, None); assert_eq!( - table.detected_misbehavior.get(&ValidatorId(1)).unwrap(), + table.detected_misbehavior.get(&AuthorityId(1)).unwrap(), &Misbehavior::UnauthorizedStatement(UnauthorizedStatement { statement: SignedStatement { statement: Statement::Available(candidate_b_digest), signature: Signature(1), - sender: ValidatorId(1), + sender: AuthorityId(1), }, }) ); - // validator 2 votes for validity on 1's candidate. + // authority 2 votes for validity on 1's candidate. let bad_validity_vote = SignedStatement { statement: Statement::Valid(candidate_a_digest.clone()), signature: Signature(2), - sender: ValidatorId(2), + sender: AuthorityId(2), }; table.import_statement(&context, bad_validity_vote, None); assert_eq!( - table.detected_misbehavior.get(&ValidatorId(2)).unwrap(), + table.detected_misbehavior.get(&AuthorityId(2)).unwrap(), &Misbehavior::UnauthorizedStatement(UnauthorizedStatement { statement: SignedStatement { statement: Statement::Valid(candidate_a_digest), signature: Signature(2), - sender: ValidatorId(2), + sender: AuthorityId(2), }, }) ); @@ -778,10 +919,10 @@ mod tests { #[test] fn validity_double_vote_is_misbehavior() { let context = TestContext { - validators: { + authorities: { let mut map = HashMap::new(); - map.insert(ValidatorId(1), (GroupId(2), GroupId(455))); - map.insert(ValidatorId(2), (GroupId(2), GroupId(246))); + map.insert(AuthorityId(1), (GroupId(2), GroupId(455))); + map.insert(AuthorityId(2), (GroupId(2), GroupId(246))); map } }; @@ -790,32 +931,32 @@ mod tests { let statement = SignedStatement { statement: Statement::Candidate(Candidate(2, 100)), signature: Signature(1), - sender: ValidatorId(1), + sender: AuthorityId(1), }; let candidate_digest = Digest(100); table.import_statement(&context, statement, None); - assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1))); + assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1))); let valid_statement = SignedStatement { statement: Statement::Valid(candidate_digest.clone()), signature: Signature(2), - sender: ValidatorId(2), + sender: AuthorityId(2), }; let invalid_statement = SignedStatement { statement: Statement::Invalid(candidate_digest.clone()), signature: Signature(2), - sender: ValidatorId(2), + sender: AuthorityId(2), }; table.import_statement(&context, valid_statement, None); - assert!(!table.detected_misbehavior.contains_key(&ValidatorId(2))); + assert!(!table.detected_misbehavior.contains_key(&AuthorityId(2))); table.import_statement(&context, invalid_statement, None); assert_eq!( - table.detected_misbehavior.get(&ValidatorId(2)).unwrap(), + table.detected_misbehavior.get(&AuthorityId(2)).unwrap(), &Misbehavior::ValidityDoubleVote(ValidityDoubleVote::ValidityAndInvalidity( candidate_digest, Signature(2), @@ -827,9 +968,9 @@ mod tests { #[test] fn issue_and_vote_is_misbehavior() { let context = TestContext { - validators: { + authorities: { let mut map = HashMap::new(); - map.insert(ValidatorId(1), (GroupId(2), GroupId(455))); + map.insert(AuthorityId(1), (GroupId(2), GroupId(455))); map } }; @@ -838,22 +979,22 @@ mod tests { let statement = SignedStatement { statement: Statement::Candidate(Candidate(2, 100)), signature: Signature(1), - sender: ValidatorId(1), + sender: AuthorityId(1), }; let candidate_digest = Digest(100); table.import_statement(&context, statement, None); - assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1))); + assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1))); let extra_vote = SignedStatement { statement: Statement::Valid(candidate_digest.clone()), signature: Signature(1), - sender: ValidatorId(1), + sender: AuthorityId(1), }; table.import_statement(&context, extra_vote, None); assert_eq!( - table.detected_misbehavior.get(&ValidatorId(1)).unwrap(), + table.detected_misbehavior.get(&AuthorityId(1)).unwrap(), &Misbehavior::ValidityDoubleVote(ValidityDoubleVote::IssuedAndValidity( (Candidate(2, 100), Signature(1)), (Digest(100), Signature(1)), @@ -877,18 +1018,18 @@ mod tests { assert!(!candidate.can_be_included(validity_threshold, availability_threshold)); for i in 0..validity_threshold { - candidate.validity_votes.insert(ValidatorId(i + 100), ValidityVote::Valid(Signature(i + 100))); + candidate.validity_votes.insert(AuthorityId(i + 100), ValidityVote::Valid(Signature(i + 100))); } assert!(!candidate.can_be_included(validity_threshold, availability_threshold)); for i in 0..availability_threshold { - candidate.availability_votes.insert(ValidatorId(i + 255), Signature(i + 255)); + candidate.availability_votes.insert(AuthorityId(i + 255), Signature(i + 255)); } assert!(candidate.can_be_included(validity_threshold, availability_threshold)); - candidate.indicated_bad_by.push(ValidatorId(1024)); + candidate.indicated_bad_by.push(AuthorityId(1024)); assert!(!candidate.can_be_included(validity_threshold, availability_threshold)); } @@ -896,9 +1037,9 @@ mod tests { #[test] fn candidate_import_gives_summary() { let context = TestContext { - validators: { + authorities: { let mut map = HashMap::new(); - map.insert(ValidatorId(1), (GroupId(2), GroupId(455))); + map.insert(AuthorityId(1), (GroupId(2), GroupId(455))); map } }; @@ -907,7 +1048,7 @@ mod tests { let statement = SignedStatement { statement: Statement::Candidate(Candidate(2, 100)), signature: Signature(1), - sender: ValidatorId(1), + sender: AuthorityId(1), }; let summary = table.import_statement(&context, statement, None) @@ -922,10 +1063,10 @@ mod tests { #[test] fn candidate_vote_gives_summary() { let context = TestContext { - validators: { + authorities: { let mut map = HashMap::new(); - map.insert(ValidatorId(1), (GroupId(2), GroupId(455))); - map.insert(ValidatorId(2), (GroupId(2), GroupId(455))); + map.insert(AuthorityId(1), (GroupId(2), GroupId(455))); + map.insert(AuthorityId(2), (GroupId(2), GroupId(455))); map } }; @@ -934,23 +1075,23 @@ mod tests { let statement = SignedStatement { statement: Statement::Candidate(Candidate(2, 100)), signature: Signature(1), - sender: ValidatorId(1), + sender: AuthorityId(1), }; let candidate_digest = Digest(100); table.import_statement(&context, statement, None); - assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1))); + assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1))); let vote = SignedStatement { statement: Statement::Valid(candidate_digest.clone()), signature: Signature(2), - sender: ValidatorId(2), + sender: AuthorityId(2), }; let summary = table.import_statement(&context, vote, None) .expect("candidate vote to give summary"); - assert!(!table.detected_misbehavior.contains_key(&ValidatorId(2))); + assert!(!table.detected_misbehavior.contains_key(&AuthorityId(2))); assert_eq!(summary.candidate, Digest(100)); assert_eq!(summary.group_id, GroupId(2)); @@ -961,10 +1102,10 @@ mod tests { #[test] fn availability_vote_gives_summary() { let context = TestContext { - validators: { + authorities: { let mut map = HashMap::new(); - map.insert(ValidatorId(1), (GroupId(2), GroupId(455))); - map.insert(ValidatorId(2), (GroupId(5), GroupId(2))); + map.insert(AuthorityId(1), (GroupId(2), GroupId(455))); + map.insert(AuthorityId(2), (GroupId(5), GroupId(2))); map } }; @@ -973,27 +1114,78 @@ mod tests { let statement = SignedStatement { statement: Statement::Candidate(Candidate(2, 100)), signature: Signature(1), - sender: ValidatorId(1), + sender: AuthorityId(1), }; let candidate_digest = Digest(100); table.import_statement(&context, statement, None); - assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1))); + assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1))); let vote = SignedStatement { statement: Statement::Available(candidate_digest.clone()), signature: Signature(2), - sender: ValidatorId(2), + sender: AuthorityId(2), }; let summary = table.import_statement(&context, vote, None) .expect("candidate vote to give summary"); - assert!(!table.detected_misbehavior.contains_key(&ValidatorId(2))); + assert!(!table.detected_misbehavior.contains_key(&AuthorityId(2))); assert_eq!(summary.candidate, Digest(100)); assert_eq!(summary.group_id, GroupId(2)); assert_eq!(summary.validity_votes, 1); assert_eq!(summary.availability_votes, 1); } + + #[test] + fn filling_batch_sets_known_flag() { + let context = TestContext { + authorities: { + let mut map = HashMap::new(); + for i in 1..10 { + map.insert(AuthorityId(i), (GroupId(2), GroupId(400 + i))); + } + map + } + }; + + let mut table = create(); + let statement = SignedStatement { + statement: Statement::Candidate(Candidate(2, 100)), + signature: Signature(1), + sender: AuthorityId(1), + }; + + table.import_statement(&context, statement, None); + + for i in 2..10 { + let statement = SignedStatement { + statement: Statement::Valid(Digest(100)), + signature: Signature(i), + sender: AuthorityId(i), + }; + + table.import_statement(&context, statement, None); + } + + let mut batch = VecBatch { + max_len: 5, + targets: (1..10).map(AuthorityId).collect(), + items: Vec::new(), + }; + + // 9 statements in the table, each seen by one. + table.fill_batch(&mut batch); + assert_eq!(batch.items.len(), 5); + + // 9 statements in the table, 5 of which seen by all targets. + batch.items.clear(); + table.fill_batch(&mut batch); + assert_eq!(batch.items.len(), 4); + + batch.items.clear(); + table.fill_batch(&mut batch); + assert!(batch.items.is_empty()); + } } diff --git a/candidate-agreement/src/tests/mod.rs b/candidate-agreement/src/tests/mod.rs new file mode 100644 index 0000000000000..1599a94aa69b7 --- /dev/null +++ b/candidate-agreement/src/tests/mod.rs @@ -0,0 +1,385 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Tests and test helpers for the candidate agreement. + +const VALIDITY_CHECK_DELAY_MS: u64 = 100; +const AVAILABILITY_CHECK_DELAY_MS: u64 = 100; +const PROPOSAL_FORMATION_TICK_MS: u64 = 50; +const PROPAGATE_STATEMENTS_TICK_MS: u64 = 200; +const TIMER_TICK_DURATION_MS: u64 = 10; + +use std::collections::HashMap; + +use futures::prelude::*; +use futures::sync::mpsc; +use tokio_timer::Timer; + +use super::*; + +#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone, Copy)] +struct AuthorityId(usize); + +#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)] +struct Digest(Vec); + +#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)] +struct GroupId(usize); + +#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)] +struct ParachainCandidate { + group: GroupId, + data: usize, +} + +#[derive(PartialEq, Eq, Debug, Clone)] +struct Proposal { + candidates: Vec, +} + +#[derive(PartialEq, Eq, Debug, Clone)] +enum Signature { + Table(AuthorityId, table::Statement), + Bft(AuthorityId, bft::Message), +} + +enum Error { + Timer(tokio_timer::TimerError), + NetOut, + NetIn, +} + +#[derive(Debug, Clone)] +struct SharedTestContext { + n_authorities: usize, + n_groups: usize, + timer: Timer, +} + +#[derive(Debug, Clone)] +struct TestContext { + shared: Arc, + local_id: AuthorityId, +} + +impl Context for TestContext { + type AuthorityId = AuthorityId; + type Digest = Digest; + type GroupId = GroupId; + type Signature = Signature; + type Proposal = Proposal; + type ParachainCandidate = ParachainCandidate; + + type CheckCandidate = Box>; + type CheckAvailability = Box>; + + type StatementBatch = VecBatch< + AuthorityId, + table::SignedStatement + >; + + fn candidate_digest(candidate: &ParachainCandidate) -> Digest { + Digest(vec![candidate.group.0, candidate.data]) + } + + fn proposal_digest(candidate: &Proposal) -> Digest { + Digest(candidate.candidates.iter().fold(Vec::new(), |mut a, c| { + a.extend(Self::candidate_digest(c).0); + a + })) + } + + fn candidate_group(candidate: &ParachainCandidate) -> GroupId { + candidate.group.clone() + } + + fn round_proposer(&self, round: usize) -> AuthorityId { + AuthorityId(round % self.shared.n_authorities) + } + + fn check_validity(&self, _candidate: &ParachainCandidate) -> Self::CheckCandidate { + let future = self.shared.timer + .sleep(::std::time::Duration::from_millis(VALIDITY_CHECK_DELAY_MS)) + .map_err(Error::Timer) + .map(|_| true); + + Box::new(future) + } + + fn check_availability(&self, _candidate: &ParachainCandidate) -> Self::CheckAvailability { + let future = self.shared.timer + .sleep(::std::time::Duration::from_millis(AVAILABILITY_CHECK_DELAY_MS)) + .map_err(Error::Timer) + .map(|_| true); + + Box::new(future) + } + + fn create_proposal(&self, candidates: Vec<&ParachainCandidate>) + -> Option + { + let t = self.shared.n_groups * 2 / 3; + if candidates.len() >= t { + Some(Proposal { + candidates: candidates.iter().map(|x| (&**x).clone()).collect() + }) + } else { + None + } + } + + fn proposal_valid(&self, proposal: &Proposal, check_candidate: F) -> bool + where F: FnMut(&ParachainCandidate) -> bool + { + if proposal.candidates.len() >= self.shared.n_groups * 2 / 3 { + proposal.candidates.iter().all(check_candidate) + } else { + false + } + } + + fn local_id(&self) -> AuthorityId { + self.local_id.clone() + } + + fn sign_table_statement( + &self, + statement: &table::Statement + ) -> Signature { + Signature::Table(self.local_id(), statement.clone()) + } + + fn sign_bft_message(&self, message: &bft::Message) -> Signature { + Signature::Bft(self.local_id(), message.clone()) + } +} + +struct TestRecovery; + +impl MessageRecovery for TestRecovery { + type UncheckedMessage = OutgoingMessage; + + fn check_message(&self, msg: Self::UncheckedMessage) -> Option> { + Some(match msg { + OutgoingMessage::Bft(c) => CheckedMessage::Bft(c), + OutgoingMessage::Table(batch) => CheckedMessage::Table(batch.items), + }) + } +} + +pub struct Network { + endpoints: Vec>, + input: mpsc::UnboundedReceiver<(usize, T)>, +} + +impl Network { + pub fn new(nodes: usize) + -> (Self, Vec>, Vec>) + { + let mut inputs = Vec::with_capacity(nodes); + let mut outputs = Vec::with_capacity(nodes); + let mut endpoints = Vec::with_capacity(nodes); + + let (in_tx, in_rx) = mpsc::unbounded(); + for _ in 0..nodes { + let (out_tx, out_rx) = mpsc::unbounded(); + inputs.push(in_tx.clone()); + outputs.push(out_rx); + endpoints.push(out_tx); + } + + let network = Network { + endpoints, + input: in_rx, + }; + + (network, inputs, outputs) + } + + pub fn route_on_thread(self) { + ::std::thread::spawn(move || { let _ = self.wait(); }); + } +} + +impl Future for Network { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), Self::Error> { + match try_ready!(self.input.poll()) { + None => Ok(Async::Ready(())), + Some((sender, item)) => { + { + let receiving_endpoints = self.endpoints + .iter() + .enumerate() + .filter(|&(i, _)| i != sender) + .map(|(_, x)| x); + + for endpoint in receiving_endpoints { + let _ = endpoint.unbounded_send(item.clone()); + } + } + + self.poll() + } + } + } +} + +#[derive(Debug, Clone)] +pub struct VecBatch { + pub max_len: usize, + pub targets: Vec, + pub items: Vec, +} + +impl ::StatementBatch for VecBatch { + fn targets(&self) -> &[V] { &self.targets } + fn is_empty(&self) -> bool { self.items.is_empty() } + fn push(&mut self, item: T) -> bool { + if self.items.len() == self.max_len { + false + } else { + self.items.push(item); + true + } + } +} + +fn make_group_assignments(n_authorities: usize, n_groups: usize) + -> HashMap> +{ + let mut map = HashMap::new(); + let threshold = (n_authorities / n_groups) / 2; + let make_blank_group = || { + GroupInfo { + validity_guarantors: HashSet::new(), + availability_guarantors: HashSet::new(), + needed_validity: threshold, + needed_availability: threshold, + } + }; + + // every authority checks validity of his ID modulo n_groups and + // guarantees availability for the group above that. + for a_id in 0..n_authorities { + let primary_group = a_id % n_groups; + let availability_groups = [ + (a_id + 1) % n_groups, + a_id.wrapping_sub(1) % n_groups, + ]; + + map.entry(GroupId(primary_group)) + .or_insert_with(&make_blank_group) + .validity_guarantors + .insert(AuthorityId(a_id)); + + for &availability_group in &availability_groups { + map.entry(GroupId(availability_group)) + .or_insert_with(&make_blank_group) + .availability_guarantors + .insert(AuthorityId(a_id)); + } + } + + map +} + +fn make_blank_batch(n_authorities: usize) -> VecBatch { + VecBatch { + max_len: 20, + targets: (0..n_authorities).map(AuthorityId).collect(), + items: Vec::new(), + } +} + +#[test] +fn consensus_completes_with_minimum_good() { + let n = 50; + let f = 16; + let n_groups = 10; + + let timer = ::tokio_timer::wheel() + .tick_duration(Duration::from_millis(TIMER_TICK_DURATION_MS)) + .num_slots(1 << 16) + .build(); + + let (network, inputs, outputs) = Network::<(AuthorityId, OutgoingMessage)>::new(n - f); + network.route_on_thread(); + + let shared_test_context = Arc::new(SharedTestContext { + n_authorities: n, + n_groups: n_groups, + timer: timer.clone(), + }); + + let groups = make_group_assignments(n, n_groups); + + let authorities = inputs.into_iter().zip(outputs).enumerate().map(|(raw_id, (input, output))| { + let id = AuthorityId(raw_id); + let context = TestContext { + shared: shared_test_context.clone(), + local_id: id, + }; + + let shared_table = SharedTable::new(context.clone(), groups.clone()); + let params = AgreementParams { + context, + timer: timer.clone(), + table: shared_table, + nodes: n, + max_faulty: f, + round_timeout_multiplier: 4, + message_buffer_size: 100, + form_proposal_interval: Duration::from_millis(PROPOSAL_FORMATION_TICK_MS), + }; + + let net_out = input + .sink_map_err(|_| Error::NetOut) + .with(move |x| Ok::<_, Error>((id.0, (id, x))) ); + + let net_in = output + .map_err(|_| Error::NetIn) + .map(move |(v, msg)| (v, vec![msg])); + + let propagate_statements = timer + .interval(Duration::from_millis(PROPAGATE_STATEMENTS_TICK_MS)) + .map(move |()| make_blank_batch(n)) + .map_err(Error::Timer); + + let local_candidate = if raw_id < n_groups { + let candidate = ParachainCandidate { + group: GroupId(raw_id), + data: raw_id, + }; + ::futures::future::Either::A(Ok::<_, Error>(candidate).into_future()) + } else { + ::futures::future::Either::B(::futures::future::empty()) + }; + + agree::<_, _, _, _, _, _, Error>( + params, + net_in, + net_out, + TestRecovery, + propagate_statements, + local_candidate + ) + }).collect::>(); + + futures::future::join_all(authorities).wait().unwrap(); +}