Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

Commit

Permalink
refactor update current status
Browse files Browse the repository at this point in the history
  • Loading branch information
Eason committed Sep 26, 2019
1 parent e9635bf commit 49ed197
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 118 deletions.
19 changes: 12 additions & 7 deletions core/consensus/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use protocol::traits::{
use protocol::types::{Address, Epoch, Hash, Proof, Receipt, SignedTransaction, Validator};
use protocol::ProtocolResult;

use crate::fixed_types::{
ConsensusRpcRequest, FixedEpochID, FixedEpochs, FixedSignedTxs, PullTxsRequest,
};
use crate::fixed_types::{ConsensusRpcRequest, ConsensusRpcResponse, FixedEpochID, PullTxsRequest};
use crate::{ConsensusError, MsgType};

pub struct OverlordConsensusAdapter<
EF: ExecutorFactory<DB>,
Expand Down Expand Up @@ -150,15 +149,18 @@ where
let msg = FixedEpochID::new(epoch_id);
let res = self
.rpc
.call::<ConsensusRpcRequest, FixedEpochs>(
.call::<ConsensusRpcRequest, ConsensusRpcResponse>(
ctx,
end,
ConsensusRpcRequest::PullEpochs(msg),
Priority::High,
)
.await?;

Ok(res.inner)
match res {
ConsensusRpcResponse::PullEpochs(epoch) => Ok(epoch.inner),
_ => Err(ConsensusError::RpcErr(MsgType::RpcPullEpochs).into()),
}
}

async fn pull_txs(
Expand All @@ -170,15 +172,18 @@ where
let msg = PullTxsRequest::new(hashes);
let res = self
.rpc
.call::<ConsensusRpcRequest, FixedSignedTxs>(
.call::<ConsensusRpcRequest, ConsensusRpcResponse>(
ctx,
end,
ConsensusRpcRequest::PullTxs(msg),
Priority::High,
)
.await?;

Ok(res.inner)
match res {
ConsensusRpcResponse::PullTxs(txs) => Ok(txs.inner),
_ => Err(ConsensusError::RpcErr(MsgType::RpcPullTxs).into()),
}
}

async fn get_epoch_by_id(&self, _ctx: Context, epoch_id: u64) -> ProtocolResult<Epoch> {
Expand Down
31 changes: 23 additions & 8 deletions core/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,36 @@ impl<Adapter: ConsensusAdapter + 'static> Consensus for OverlordConsensus<Adapte
}
}

// Need to synchronization.
self.engine.lock.lock().await;
let mut prev_hash = self
.engine
.get_epoch_by_id(ctx.clone(), current_epoch_id + 1)
.await?
.header
.pre_hash;

// Check epoch for the first time.
let epoch_header = self
.engine
.pull_epoch(ctx.clone(), current_epoch_id + 1)
.await?
.header;
self.check_proof(current_epoch_id + 1, epoch_header.proof.clone())?;
if prev_hash != epoch_header.pre_hash {
return Err(ConsensusError::SyncEpochHashErr(current_epoch_id + 1).into());
}

// Lock the consensus engine, block commit process.
self.engine.lock.lock().await;

// Start to synchronization.
for id in (current_epoch_id + 1)..=rich_epoch_id {
// First pull a new block.
let epoch = self.engine.pull_epoch(ctx.clone(), id).await?;
let tmp_prev_hash = epoch.header.pre_hash.clone();

// Check proof and previous hash.
self.check_proof(id, epoch.header.proof.clone())?;
let proof = epoch.header.proof.clone();
self.check_proof(id, proof.clone())?;
if prev_hash != epoch.header.pre_hash {
return Err(ConsensusError::SyncEpochHashErr(id).into());
}
Expand All @@ -114,15 +128,16 @@ impl<Adapter: ConsensusAdapter + 'static> Consensus for OverlordConsensus<Adapte
// 2. Save the signed transactions.
// 3. Save the latest proof.
// 4. Save the new epoch.
let _ = self
let exec_resp = self
.engine
.exec(Address::User(epoch.header.proposer.clone()), txs.clone())
.await?;
self.engine.save_signed_txs(txs).await?;
self.engine.save_proof(epoch.header.proof.clone()).await?;
self.engine.save_epoch(epoch).await?;

// Update the previous hash for next check.
self.engine
.update_status(id, epoch, proof, exec_resp, txs)
.await?;

// Update the previous hash and last epoch.
prev_hash = tmp_prev_hash;
}
Ok(())
Expand Down
91 changes: 56 additions & 35 deletions core/consensus/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use std::{error::Error, sync::Arc};

use async_trait::async_trait;
use bincode::serialize;
use bytes::Bytes;
use futures::lock::Mutex;
use overlord::types::{Commit, Node, OverlordMsg, Status};
Expand All @@ -20,9 +21,10 @@ use protocol::types::{
};
use protocol::{ProtocolError, ProtocolResult};

use crate::fixed_types::{FixedPill, FixedSignedTxs};
use crate::fixed_types::{FixedEpochID, FixedPill, FixedSignedTxs};
use crate::message::{
END_GOSSIP_AGGREGATED_VOTE, END_GOSSIP_SIGNED_PROPOSAL, END_GOSSIP_SIGNED_VOTE, RPC_SYNC_PULL,
END_GOSSIP_AGGREGATED_VOTE, END_GOSSIP_RICH_EPOCH_ID, END_GOSSIP_SIGNED_PROPOSAL,
END_GOSSIP_SIGNED_VOTE, RPC_SYNC_PULL,
};
use crate::ConsensusError;

Expand Down Expand Up @@ -157,6 +159,11 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill, FixedSignedTxs>
.get_full_txs(ctx.clone(), pill.epoch.ordered_tx_hashes.clone())
.await?;

// TODO: parallelism
self.adapter
.flush_mempool(ctx.clone(), pill.epoch.ordered_tx_hashes.clone())
.await?;

// Execute transactions
let node_info = self.node_info.clone();
let status = self.current_consensus_status.read().clone();
Expand All @@ -166,33 +173,26 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill, FixedSignedTxs>
.execute(ctx.clone(), node_info, status, coinbase, full_txs.clone())
.await?;

// Save receipts
self.adapter
.save_receipts(ctx.clone(), exec_resp.receipts.clone())
.await?;
// Save signed transactions
// Broadcast rich epoch ID
let msg = serialize(&FixedEpochID::new(epoch_id + 1)).map_err(|_| {
ProtocolError::from(ConsensusError::Other(
"Encode rich epoch ID error".to_string(),
))
})?;
self.adapter
.save_signed_txs(ctx.clone(), full_txs.clone())
.transmit(
ctx.clone(),
msg,
END_GOSSIP_RICH_EPOCH_ID,
MessageTarget::Broadcast,
)
.await?;

// Save the epoch.
let mut epoch = pill.epoch;
self.adapter.save_epoch(ctx.clone(), epoch.clone()).await?;

self.adapter
.flush_mempool(ctx.clone(), epoch.ordered_tx_hashes.clone())
self.update_status(epoch_id, pill.epoch, proof, exec_resp, full_txs)
.await?;

let prev_hash = Hash::digest(epoch.encode().await?);

// TODO: update current consensus status
let current_consensus_status = {
let mut current_consensus_status = self.current_consensus_status.write();
current_consensus_status.epoch_id = epoch_id + 1;
current_consensus_status.prev_hash = prev_hash;
current_consensus_status.proof = proof;
current_consensus_status.state_root = exec_resp.state_root.clone();

let current_consensus_status = self.current_consensus_status.read();
current_consensus_status.clone()
};

Expand Down Expand Up @@ -309,18 +309,6 @@ impl<Adapter: ConsensusAdapter + 'static> ConsensusEngine<Adapter> {
self.adapter.get_epoch_by_id(ctx, epoch_id).await
}

pub async fn save_signed_txs(&self, txs: Vec<SignedTransaction>) -> ProtocolResult<()> {
self.adapter.save_signed_txs(Context::new(), txs).await
}

pub async fn save_proof(&self, proof: Proof) -> ProtocolResult<()> {
self.adapter.save_proof(Context::new(), proof).await
}

pub async fn save_epoch(&self, epoch: Epoch) -> ProtocolResult<()> {
self.adapter.save_epoch(Context::new(), epoch).await
}

pub async fn exec(
&self,
address: Address,
Expand All @@ -334,6 +322,39 @@ impl<Adapter: ConsensusAdapter + 'static> ConsensusEngine<Adapter> {
.execute(Context::new(), self.node_info.clone(), status, address, txs)
.await
}

/// **TODO:** update current consensus status
pub async fn update_status(
&self,
epoch_id: u64,
mut epoch: Epoch,
proof: Proof,
exec_resp: ExecutorExecResp,
txs: Vec<SignedTransaction>,
) -> ProtocolResult<()> {
// Save receipts
self.adapter
.save_receipts(Context::new(), exec_resp.receipts.clone())
.await?;
// Save signed transactions
self.adapter.save_signed_txs(Context::new(), txs).await?;

// Save the epoch.
self.adapter
.save_epoch(Context::new(), epoch.clone())
.await?;

let prev_hash = Hash::digest(epoch.encode().await?);
{
let mut current_consensus_status = self.current_consensus_status.write();
current_consensus_status.epoch_id = epoch_id + 1;
current_consensus_status.prev_hash = prev_hash;
current_consensus_status.proof = proof;
current_consensus_status.state_root = exec_resp.state_root.clone();
}

Ok(())
}
}

fn covert_to_overlord_authority(validators: &[Validator]) -> Vec<Node> {
Expand Down
12 changes: 9 additions & 3 deletions core/consensus/src/fixed_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ pub enum ConsensusRpcRequest {
PullTxs(PullTxsRequest),
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ConsensusRpcResponse {
PullEpochs(Box<FixedEpoch>),
PullTxs(Box<FixedSignedTxs>),
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct FixedEpochID {
pub inner: u64,
Expand All @@ -38,14 +44,14 @@ impl PullTxsRequest {
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct FixedEpochs {
pub struct FixedEpoch {
#[serde(with = "core_network::serde")]
pub inner: Epoch,
}

impl FixedEpochs {
impl FixedEpoch {
pub fn new(inner: Epoch) -> Self {
FixedEpochs { inner }
FixedEpoch { inner }
}
}

Expand Down
16 changes: 14 additions & 2 deletions core/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ pub enum MsgType {

#[display(fmt = "Rich Epoch ID")]
RichEpochID,

#[display(fmt = "Rpc Pull Epochs")]
RpcPullEpochs,

#[display(fmt = "Rpc Pull Transactions")]
RpcPullTxs,
}

/// Consensus errors defines here.
Expand Down Expand Up @@ -63,12 +69,18 @@ pub enum ConsensusError {
#[display(fmt = "Crypto error {:?}", _0)]
CryptoErr(Box<CryptoError>),

///
/// The synchronous epoch does not pass the checks.
#[display(fmt = "Synchronization {} epoch error", _0)]
SyncEpochHashErr(u64),

///
/// The synchronous epoch proof does not pass the checks.
#[display(fmt = "Synchronization {} proof error", _0)]
SyncEpochProofErr(u64),

/// The Rpc response mismatch the request.
#[display(fmt = "Synchronization Rpc {:?} message mismatch", _0)]
RpcErr(MsgType),

/// Other error used for very few errors.
#[display(fmt = "{:?}", _0)]
Other(String),
Expand Down
60 changes: 58 additions & 2 deletions core/consensus/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use overlord::Codec;
use rlp::Encodable;
use serde::{Deserialize, Serialize};

use protocol::traits::{Consensus, Context, MessageHandler};
use protocol::traits::{Consensus, Context, MessageHandler, Priority, Rpc, Storage};
use protocol::ProtocolResult;

use crate::fixed_types::FixedEpochID;
use crate::fixed_types::{
ConsensusRpcRequest, ConsensusRpcResponse, FixedEpoch, FixedEpochID, FixedSignedTxs,
};

pub const END_GOSSIP_SIGNED_PROPOSAL: &str = "/gossip/consensus/signed_proposal";
pub const END_GOSSIP_SIGNED_VOTE: &str = "/gossip/consensus/signed_vote";
Expand Down Expand Up @@ -129,3 +131,57 @@ impl<C: Consensus + 'static> MessageHandler for RichEpochIDMessageHandler<C> {
self.consensus.update_epoch(ctx, msg.0).await
}
}

#[derive(Debug)]
pub struct RpcHandler<R, S> {
rpc: Arc<R>,
storage: Arc<S>,
}

#[async_trait]
impl<R: Rpc + 'static, S: Storage + 'static> MessageHandler for RpcHandler<R, S> {
type Message = ConsensusRpcRequest;

async fn process(&self, ctx: Context, msg: ConsensusRpcRequest) -> ProtocolResult<()> {
match msg {
ConsensusRpcRequest::PullEpochs(ep) => {
let res = self.storage.get_epoch_by_epoch_id(ep.inner).await?;

self.rpc
.response(
ctx,
RPC_SYNC_PULL,
ConsensusRpcResponse::PullEpochs(Box::new(FixedEpoch::new(res))),
Priority::High,
)
.await
}

ConsensusRpcRequest::PullTxs(txs) => {
let mut res = Vec::new();
for tx in txs.inner.into_iter() {
res.push(self.storage.get_transaction_by_hash(tx).await?);
}

self.rpc
.response(
ctx,
RPC_SYNC_PULL,
ConsensusRpcResponse::PullTxs(Box::new(FixedSignedTxs::new(res))),
Priority::High,
)
.await
}
}
}
}

impl<R, S> RpcHandler<R, S>
where
R: Rpc + 'static,
S: Storage + 'static,
{
pub fn new(rpc: Arc<R>, storage: Arc<S>) -> Self {
RpcHandler { rpc, storage }
}
}
Loading

0 comments on commit 49ed197

Please sign in to comment.