Skip to content

Commit

Permalink
Merge branch 'nuno1212s:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
nuno1212s authored Nov 27, 2023
2 parents f23f60f + 8d563b1 commit 41735a6
Show file tree
Hide file tree
Showing 14 changed files with 146 additions and 96 deletions.
7 changes: 4 additions & 3 deletions febft-pbft-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ default = [
"serialize_serde"
]

serialize_serde = ["atlas-capnp", "serde", "serde_bytes", "bincode", "atlas-common/serialize_serde",
serialize_serde = ["atlas-capnp", "serde_bytes", "bincode", "atlas-common/serialize_serde",
"atlas-smr-application/serialize_serde", "atlas-communication/serialize_serde", "atlas-core/serialize_serde"]
serialize_capnp = ["atlas-capnp"]

Expand All @@ -38,6 +38,8 @@ mimalloc = { version = "*", default-features = false }
rand = {version = "0.8.5", features = ["small_rng"] }

[dependencies]
anyhow = "1.0.75"
thiserror = "1.0.50"
atlas-common = { path = "../../Atlas/Atlas-Common" }
atlas-communication = { path = "../../Atlas/Atlas-Communication" }
atlas-smr-application = { path = "../../Atlas/Atlas-SMR-Application" }
Expand Down Expand Up @@ -74,7 +76,7 @@ ring = { version = "0.16", optional = true }
threadpool-crossbeam-channel = { version = "1.8.0", optional = true }
#async-semaphore = { version = "1", optional = true }

serde = { version = "*", features = ["derive", "rc"], optional = true }
serde = { version = "*", features = ["derive", "rc"]}
bincode = { version = "2.0.0-rc.2", features = ["serde"], optional = true }

flume = { version = "0.10", optional = true }
Expand All @@ -94,7 +96,6 @@ log4rs = { version = "1.1.1", features = ["file_appender"] }
#tracing = "0.1.32"
#tracing-subscriber = { version = "0.3.11", features = ["fmt"] }

thiserror = "1.0.32"
num-bigint = "0.4.3"
num-traits = "0.2.15"

Expand Down
27 changes: 6 additions & 21 deletions febft-pbft-consensus/src/bft/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,25 @@
use std::time::Duration;
use serde::Deserialize;

use atlas_common::node_id::NodeId;
use atlas_core::followers::FollowerHandle;
use atlas_smr_application::serialize::ApplicationData;

use crate::bft::message::serialize::PBFTConsensus;
use crate::bft::sync::view::ViewInfo;

pub struct PBFTConfig<D>
where D: ApplicationData + 'static {
pub node_id: NodeId,
// pub observer_handle: ObserverHandle,
pub follower_handle: Option<FollowerHandle<D, PBFTConsensus<D>, PBFTConsensus<D>>>,
pub view: ViewInfo,
#[derive(Debug, Deserialize)]
pub struct PBFTConfig {
pub timeout_dur: Duration,
pub proposer_config: ProposerConfig,
pub watermark: u32,
}

impl<D: ApplicationData + 'static, > PBFTConfig<D> {
pub fn new(node_id: NodeId,
follower_handle: Option<FollowerHandle<D, PBFTConsensus<D>, PBFTConsensus<D>>>,
view: ViewInfo, timeout_dur: Duration,
impl PBFTConfig {
pub fn new(timeout_dur: Duration,
watermark: u32, proposer_config: ProposerConfig) -> Self {
Self {
node_id,
// observer_handle,
follower_handle,
view,
timeout_dur,
proposer_config,
watermark,
}
}
}

#[derive(Debug, Deserialize)]
pub struct ProposerConfig {
pub target_batch_size: u64,
pub max_batch_size: u64,
Expand Down
16 changes: 14 additions & 2 deletions febft-pbft-consensus/src/bft/consensus/decision/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::time::Instant;

use chrono::Utc;
use log::{debug, info, warn};
use thiserror::Error;
use atlas_common::Err;

use atlas_common::error::*;
use atlas_common::node_id::NodeId;
Expand Down Expand Up @@ -552,10 +554,12 @@ impl<D> ConsensusDecision<D>
/// Finalize this consensus decision and return the information about the batch
pub fn finalize(self) -> Result<CompletedBatch<D::Request>> {
if let DecisionPhase::Decided = self.phase {
let seq = self.sequence_number();

self.working_log.finish_processing_batch()
.ok_or(Error::simple_with_msg(ErrorKind::Consensus, "Failed to finalize batch"))
.ok_or(anyhow::Error::from(DecisionError::FailedToFinalizeBatch(seq)))
} else {
Err(Error::simple_with_msg(ErrorKind::Consensus, "Cannot finalize batch that is not decided"))
Err!(DecisionError::CannotFinalizeUndecidedBatch(self.sequence_number()))
}
}

Expand Down Expand Up @@ -624,4 +628,12 @@ impl<O> Debug for DecisionPollStatus<O> {
}
}
}
}

#[derive(Error, Debug)]
pub enum DecisionError {
#[error("Unable to finalize an undecided batch {0:?}")]
CannotFinalizeUndecidedBatch(SeqNo),
#[error("Failed to finalize a batch {0:?}")]
FailedToFinalizeBatch(SeqNo),
}
48 changes: 30 additions & 18 deletions febft-pbft-consensus/src/bft/log/deciding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use std::collections::{BTreeMap, BTreeSet};
use std::iter;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use thiserror::Error;

use atlas_common::crypto::hash::{Context, Digest};
use atlas_common::Err;
use atlas_common::error::*;
use atlas_common::node_id::NodeId;
use atlas_common::ordering::{Orderable, SeqNo};
Expand Down Expand Up @@ -161,18 +163,13 @@ impl<O> WorkingDecisionLog<O> where O: Clone {
let sending_leader = header.from();

let slice = self.request_space_slices.get(&sending_leader)
.ok_or(Error::simple_with_msg(ErrorKind::MsgLogDecidingLog,
format!("Failed to get request space for leader {:?}. Len: {:?}. {:?}",
sending_leader,
self.request_space_slices.len(),
self.request_space_slices).as_str()))?;
.ok_or(DecidingLogError::FailedToGetLeadersRequestSpace(sending_leader))?;

if sending_leader != self.node_id {
// Only check batches from other leaders since we implicitly trust in ourselves
for request in &batch_rq_digests {
if !crate::bft::sync::view::is_request_in_hash_space(&request.digest(), slice) {
return Err(Error::simple_with_msg(ErrorKind::MsgLogDecidingLog,
"This batch contains requests that are not in the hash space of the leader."));
return Err!(DecidingLogError::BatchContainsRequestsNotInLeaderAddrSpace(sending_leader));
}
}
}
Expand All @@ -183,7 +180,7 @@ impl<O> WorkingDecisionLog<O> where O: Clone {
let leader_index = pre_prepare_index_of(&self.leader_set, &sending_leader)?;

if leader_index >= self.pre_prepare_digests.len() {
unreachable!("Cannot insert a pre prepare message that was sent by a leader that is out of bounds")
return Err!(DecidingLogError::LeaderNotInLeaderSet(sending_leader));
}

self.pre_prepare_digests[leader_index] = Some(digest);
Expand All @@ -210,7 +207,7 @@ impl<O> WorkingDecisionLog<O> where O: Clone {
let result = self.calculate_instance_digest();

let (digest, ordering) = result
.ok_or(Error::simple_with_msg(ErrorKind::MsgLogDecidingLog, "Failed to calculate instance digest"))?;
.ok_or(DecidingLogError::FailedToCalculateDigest(self.seq_no))?;

self.batch_digest = Some(digest.clone());

Expand Down Expand Up @@ -382,24 +379,21 @@ impl<O> Orderable for WorkingDecisionLog<O> {
impl DuplicateReplicaEvaluator {
fn insert_pre_prepare_received(&mut self, node_id: NodeId) -> Result<()> {
if !self.received_pre_prepare_messages.insert(node_id) {
return Err(Error::simple_with_msg(ErrorKind::MsgLogDecidingLog,
"We have already received a message from that leader."));
return Err!(DecidingLogError::DuplicateVoteFromNode(node_id));
}

Ok(())
}
fn insert_prepare_received(&mut self, node_id: NodeId) -> Result<()> {
if !self.received_prepare_messages.insert(node_id) {
return Err(Error::simple_with_msg(ErrorKind::MsgLogDecidingLog,
"We have already received a message from that leader."));
return Err!(DecidingLogError::DuplicateVoteFromNode(node_id));
}

Ok(())
}
fn insert_commit_received(&mut self, node_id: NodeId) -> Result<()> {
if !self.received_commit_messages.insert(node_id) {
return Err(Error::simple_with_msg(ErrorKind::MsgLogDecidingLog,
"We have already received a message from that leader."));
return Err!(DecidingLogError::DuplicateVoteFromNode(node_id));
}

Ok(())
Expand All @@ -410,7 +404,7 @@ impl DuplicateReplicaEvaluator {
pub fn pre_prepare_index_from_digest_opt(prepare_set: &Vec<Option<Digest>>, digest: &Digest) -> Result<usize> {
match prepare_set.iter().position(|pre_prepare| pre_prepare.map(|d| d == *digest).unwrap_or(false)) {
None => {
Err(Error::simple_with_msg(ErrorKind::Consensus, "Pre prepare is not part of the pre prepare set"))
Err!(DecidingLogError::PrePrepareNotPartOfSet(digest.clone(), prepare_set.iter().cloned().filter(Option::is_some).map(|r| r.unwrap()).collect()))
}
Some(pos) => {
Ok(pos)
Expand All @@ -421,7 +415,7 @@ pub fn pre_prepare_index_from_digest_opt(prepare_set: &Vec<Option<Digest>>, dige
pub fn pre_prepare_index_of_from_digest(prepare_set: &Vec<Digest>, preprepare: &Digest) -> Result<usize> {
match prepare_set.iter().position(|pre_prepare| *pre_prepare == *preprepare) {
None => {
Err(Error::simple_with_msg(ErrorKind::Consensus, "Pre prepare is not part of the pre prepare set"))
Err!(DecidingLogError::PrePrepareNotPartOfSet(preprepare.clone(), prepare_set.clone()))
}
Some(pos) => {
Ok(pos)
Expand All @@ -432,10 +426,28 @@ pub fn pre_prepare_index_of_from_digest(prepare_set: &Vec<Digest>, preprepare: &
pub fn pre_prepare_index_of(leader_set: &Vec<NodeId>, proposer: &NodeId) -> Result<usize> {
match leader_set.iter().position(|node| *node == *proposer) {
None => {
Err(Error::simple_with_msg(ErrorKind::Consensus, "Proposer is not part of the leader set"))
Err!(DecidingLogError::ProposerNotInLeaderSet(proposer.clone(), leader_set.clone()))
}
Some(pos) => {
Ok(pos)
}
}
}

#[derive(Error, Debug)]
pub enum DecidingLogError {
#[error("Proposer is not part of leader set {0:?}, leader set {1:?}")]
ProposerNotInLeaderSet(NodeId, Vec<NodeId>),
#[error("Pre prepare is not part of the pre prepare set. {0:?} vs {1:?}")]
PrePrepareNotPartOfSet(Digest, Vec<Digest>),
#[error("We have received a duplicate vote from node {0:?}")]
DuplicateVoteFromNode(NodeId),
#[error("Failed to calculate instance digest {0:?}")]
FailedToCalculateDigest(SeqNo),
#[error("Failed leader not in leader set")]
LeaderNotInLeaderSet(NodeId),
#[error("Batch contains requests that are not in leader's ({0:?}) address space")]
BatchContainsRequestsNotInLeaderAddrSpace(NodeId),
#[error("Failed to get leader's request space {0:?}")]
FailedToGetLeadersRequestSpace(NodeId),
}
24 changes: 18 additions & 6 deletions febft-pbft-consensus/src/bft/log/decisions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use std::ops::Deref;

#[cfg(feature = "serialize_serde")]
use serde::{Deserialize, Serialize};
use thiserror::Error;

use atlas_common::crypto::hash::Digest;
use atlas_common::Err;
use atlas_common::error::*;
use atlas_common::ordering::{Orderable, SeqNo};
use atlas_core::ordering_protocol::networking::serialize::OrderProtocolProof;
Expand Down Expand Up @@ -135,7 +137,7 @@ impl<O> Proof<O> {
ConsensusMessageKind::PrePrepare(_) => {
let option = metadata.pre_prepare_ordering().iter().position(|digest| *x.header().digest() == *digest);

let index = option.ok_or(Error::simple_with_msg(ErrorKind::OrderProtocolProof, "Failed to create proof as pre prepare is not contained in metadata ordering"))?;
let index = option.ok_or(ProofError::PrePrepareNotContainedInMetadata)?;

pre_prepares[index] = Some(x);
}
Expand All @@ -151,7 +153,7 @@ impl<O> Proof<O> {
let mut pre_prepares_f = Vec::with_capacity(metadata.pre_prepare_ordering().len());

for message in pre_prepares.into_iter() {
pre_prepares_f.push(message.ok_or(Error::simple_with_msg(ErrorKind::OrderProtocolProof, "Failed to create proof as pre prepare list is not complete"))?);
pre_prepares_f.push(message.ok_or(ProofError::PrePrepareListNotComplete)?);
}

Ok(Self {
Expand Down Expand Up @@ -184,8 +186,7 @@ impl<O> Proof<O> {
/// Check if the amount of pre prepares line up with the expected amount
fn check_pre_prepare_sizes(&self) -> Result<()> {
if self.metadata.pre_prepare_ordering().len() != self.pre_prepares.len() {
return Err(Error::simple_with_msg(ErrorKind::MsgLogDecisions,
"Wrong number of pre prepares."));
return Err!(ProofError::WrongPrePrepareCount(self.metadata.pre_prepare_ordering().len(), self.pre_prepares.len()));
}

Ok(())
Expand Down Expand Up @@ -225,8 +226,7 @@ impl<O> Proof<O> {
ordered_pre_prepares.push(message);
}
None => {
return Err(Error::simple_with_msg(ErrorKind::MsgLogDecisions,
"Proof's batches do not match with the digests provided."));
return Err!(ProofError::BatchDigestsDoNotMatch);
}
}
}
Expand Down Expand Up @@ -367,4 +367,16 @@ impl<O> Debug for CollectData<O> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "CollectData {{ incomplete_proof: {:?}, last_proof: {:?} }}", self.incomplete_proof, self.last_proof)
}
}

#[derive(Error, Debug)]
pub enum ProofError {
#[error("Failed to create proof as pre prepare request was not contained within the provided metadata")]
PrePrepareNotContainedInMetadata,
#[error("Failed to create proof as pre prepare list is not complete")]
PrePrepareListNotComplete,
#[error("Failed to create proof as there is a wrong pre prepare count expected {0:?} got {1:?}")]
WrongPrePrepareCount(usize, usize),
#[error("Proof's batches do not match with the digests provided.")]
BatchDigestsDoNotMatch,
}
28 changes: 24 additions & 4 deletions febft-pbft-consensus/src/bft/log/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use either::Either;
use thiserror::Error;
use atlas_common::Err;

use atlas_common::error::*;
use atlas_common::node_id::NodeId;
Expand Down Expand Up @@ -36,15 +38,19 @@ impl<D> Log<D> where D: ApplicationData {
if let Some(decision) = self.decision_log().last_execution() {
match proof.seq_no().index(decision) {
Either::Left(_) | Either::Right(0) => {
return Err(Error::simple_with_msg(ErrorKind::MsgLogDecidedLog,
"Cannot install proof as we already have a decision that is >= to the one that was provided"));
return Err!(LogError::CannotInstallDecisionAlreadyAhead {
already_installed: decision,
install_attempt: proof.sequence_number()
});
}
Either::Right(1) => {
self.decided.append_proof(proof.clone());
}
Either::Right(_) => {
return Err(Error::simple_with_msg(ErrorKind::MsgLogDecidedLog,
"Cannot install proof as it would involve skipping a decision that we are not aware of"));
return Err!(LogError::CannotInstallWouldSkip {
install_attempt: proof.sequence_number(),
currently_installed: decision
});
}
}
}
Expand Down Expand Up @@ -156,3 +162,17 @@ impl<O> From<&Proof<O>> for ProtocolConsensusDecision<O> where O: Clone {
value.metadata().batch_digest())
}
}

#[derive(Error, Debug)]
pub enum LogError {
#[error("Failed to install decision {install_attempt:?} as we already have decision {already_installed:?}")]
CannotInstallDecisionAlreadyAhead {
install_attempt: SeqNo,
already_installed: SeqNo,
},
#[error("Failed to install decision {install_attempt:?} as we are only on decision {currently_installed:?}")]
CannotInstallWouldSkip {
install_attempt: SeqNo,
currently_installed: SeqNo,
},
}
5 changes: 3 additions & 2 deletions febft-pbft-consensus/src/bft/message/serialize/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io::{Read, Write};
use atlas_common::error::*;
use crate::bft::message::serialize::ApplicationData;
use crate::bft::message::{ConsensusMessage};
use anyhow::Context;

pub fn serialize_consensus<W, D>(
m: &ConsensusMessage<D::Request>,
Expand All @@ -11,7 +12,7 @@ pub fn serialize_consensus<W, D>(
D: ApplicationData {

bincode::serde::encode_into_std_write(m, w, bincode::config::standard())
.wrapped_msg(ErrorKind::CommunicationSerialize, format!("Failed to serialize message {} bytes len", w.as_mut().len()).as_str())?;
.context(format!("Failed to serialize message {} bytes len", w.as_mut().len()))?;

Ok(())
}
Expand All @@ -20,7 +21,7 @@ pub fn deserialize_consensus<R, D>(
r: R
) -> Result<ConsensusMessage<D::Request>> where D: ApplicationData, R: Read + AsRef<[u8]> {
let msg = bincode::serde::decode_borrowed_from_slice(r.as_ref(), bincode::config::standard())
.wrapped_msg(ErrorKind::CommunicationSerialize, "Failed to deserialize message")?;
.context("Failed to deserialize message")?;

Ok(msg)
}
Loading

0 comments on commit 41735a6

Please sign in to comment.