Skip to content

Commit

Permalink
feat(sync): synchronization epoch (nervosnetwork#9)
Browse files Browse the repository at this point in the history
* feat: develop merkle root (nervosnetwork#17)

* feat: develop merkle root

* refactor arguments

* remove useless code

dev synchronization

dev synchronization

dev network handler

refactor update current status

cargo fmt

refactor mempool test

add iql

fix synchronous bug

cargo update

dev synchronization

tmp

update cache status

fix(mempool): Resize the queue to ensure correct switching. (nervosnetwork#18)

* fix(mempool): Resize the queue to ensure correct switching.

* resize map.

* resize map.

cargo fmt

add log

cargo fmt

feat: synchronization

cargo clippy

fix: save proof and locks

fix sync

cargo clippy

fix a bug

cargo fmt

tmp config

fix

tmp commit

fix mempool

* fix synchronization

* fix CI

* remove debug config files

* Update core/consensus/Cargo.toml

Co-Authored-By: Jiayu Ye <[email protected]>

* update Cargo lock

* fix cargo toml
  • Loading branch information
Eason Gao authored and yejiayu committed Oct 31, 2019
1 parent 02c5d2e commit 97bb21b
Show file tree
Hide file tree
Showing 14 changed files with 862 additions and 381 deletions.
499 changes: 227 additions & 272 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ If everything goes well, you’ll see this appear:
[2019-09-25T15:26:14Z INFO muta] Go with config: Config { .. }
```

Go to [http://127.0.0.1:8000/graphiql](http://127.0.0.1:8000/graphiql) to communicate with the chain and read the documentations after the chain is running.

The develop chain is worked on **LOCAL** and **SINGLE NODE**.
5 changes: 4 additions & 1 deletion core/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ bytes = "0.4"
creep = "0.1"
derive_more = "0.15"
futures-preview = "0.3.0-alpha.19"
overlord = { git = "https://github.com/cryptape/overlord.git", rev = "10ce94a0" }
log = "0.4"
overlord = { git = "https://github.com/cryptape/overlord.git", rev = "10ce94a" }
parking_lot = "0.9"
rlp = "0.4"
serde = {version = "1.0", features = ["derive"]}
serde_json = "1.0"

common-crypto = { path = "../../common/crypto"}
common-merkle = { path = "../../common/merkle"}
core-mempool = { path = "../../core/mempool"}
core-storage = { path = "../../core/storage"}
core-network = { path = "../../core/network"}
Expand Down
90 changes: 79 additions & 11 deletions core/consensus/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,30 @@ use std::marker::PhantomData;
use std::sync::Arc;

use async_trait::async_trait;
use log::debug;

use protocol::traits::executor::{ExecutorExecResp, ExecutorFactory, TrieDB};
use protocol::traits::{
ConsensusAdapter, Context, CurrentConsensusStatus, Gossip, MemPool, MessageTarget,
MixedTxHashes, NodeInfo, Priority, Storage,
ConsensusAdapter, Context, Gossip, MemPool, MessageTarget, MixedTxHashes, NodeInfo, Priority,
Rpc, Storage,
};
use protocol::types::{
Address, Epoch, Hash, MerkleRoot, Proof, Receipt, SignedTransaction, Validator,
};
use protocol::types::{Address, Epoch, Hash, Proof, Receipt, SignedTransaction, Validator};
use protocol::ProtocolResult;

use crate::fixed_types::{ConsensusRpcRequest, ConsensusRpcResponse, PullTxsRequest};
use crate::{ConsensusError, MsgType};

pub struct OverlordConsensusAdapter<
EF: ExecutorFactory<DB>,
G: Gossip,
M: MemPool,
R: Rpc,
S: Storage,
DB: TrieDB,
> {
rpc: Arc<R>,
network: Arc<G>,
mempool: Arc<M>,
storage: Arc<S>,
Expand All @@ -27,10 +35,11 @@ pub struct OverlordConsensusAdapter<
}

#[async_trait]
impl<EF, G, M, S, DB> ConsensusAdapter for OverlordConsensusAdapter<EF, G, M, S, DB>
impl<EF, G, M, R, S, DB> ConsensusAdapter for OverlordConsensusAdapter<EF, G, M, R, S, DB>
where
EF: ExecutorFactory<DB>,
G: Gossip + Sync + Send,
R: Rpc + Sync + Send,
M: MemPool,
S: Storage,
DB: TrieDB,
Expand Down Expand Up @@ -84,18 +93,19 @@ where

async fn execute(
&self,
_ctx: Context,
node_info: NodeInfo,
status: CurrentConsensusStatus,
state_root: MerkleRoot,
epoch_id: u64,
cycles_price: u64,
coinbase: Address,
signed_txs: Vec<SignedTransaction>,
) -> ProtocolResult<ExecutorExecResp> {
let mut executor = EF::from_root(
node_info.chain_id,
status.state_root,
state_root,
Arc::clone(&self.trie_db),
status.epoch_id,
status.cycles_price,
epoch_id,
cycles_price,
coinbase,
)?;
executor.exec(signed_txs)
Expand Down Expand Up @@ -133,18 +143,76 @@ where
let epoch = self.storage.get_epoch_by_epoch_id(epoch_id).await?;
Ok(epoch.header.validators)
}

async fn get_current_epoch_id(&self, _ctx: Context) -> ProtocolResult<u64> {
let res = self.storage.get_latest_epoch().await?;
Ok(res.header.epoch_id)
}

async fn pull_epoch(&self, ctx: Context, epoch_id: u64, end: &str) -> ProtocolResult<Epoch> {
debug!("consensus: send rpc pull epoch {}", epoch_id);
let res = self
.rpc
.call::<ConsensusRpcRequest, ConsensusRpcResponse>(
ctx,
end,
ConsensusRpcRequest::PullEpochs(epoch_id),
Priority::High,
)
.await?;

match res {
ConsensusRpcResponse::PullEpochs(epoch) => Ok(epoch.inner),
_ => Err(ConsensusError::RpcErr(MsgType::RpcPullEpochs).into()),
}
}

async fn pull_txs(
&self,
ctx: Context,
hashes: Vec<Hash>,
end: &str,
) -> ProtocolResult<Vec<SignedTransaction>> {
let msg = PullTxsRequest::new(hashes);
let res = self
.rpc
.call::<ConsensusRpcRequest, ConsensusRpcResponse>(
ctx,
end,
ConsensusRpcRequest::PullTxs(msg),
Priority::High,
)
.await?;

match res {
ConsensusRpcResponse::PullTxs(txs) => Ok(txs.inner),
_ => Err(ConsensusError::RpcErr(MsgType::RpcPullTxs).into()),
}
}

async fn get_epoch_by_id(&self, _ctx: Context, epoch_id: u64) -> ProtocolResult<Epoch> {
self.storage.get_epoch_by_epoch_id(epoch_id).await
}
}

impl<EF, G, M, S, DB> OverlordConsensusAdapter<EF, G, M, S, DB>
impl<EF, G, M, R, S, DB> OverlordConsensusAdapter<EF, G, M, R, S, DB>
where
EF: ExecutorFactory<DB>,
G: Gossip + Sync + Send,
R: Rpc + Sync + Send,
M: MemPool,
S: Storage,
DB: TrieDB,
{
pub fn new(network: Arc<G>, mempool: Arc<M>, storage: Arc<S>, trie_db: Arc<DB>) -> Self {
pub fn new(
rpc: Arc<R>,
network: Arc<G>,
mempool: Arc<M>,
storage: Arc<S>,
trie_db: Arc<DB>,
) -> Self {
OverlordConsensusAdapter {
rpc,
network,
mempool,
storage,
Expand Down
144 changes: 134 additions & 10 deletions core/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use std::sync::Arc;

use async_trait::async_trait;
use bincode::deserialize;
use creep::Context;
use futures::lock::Mutex;
use log::{debug, info};
use overlord::types::{AggregatedVote, Node, OverlordMsg, SignedProposal, SignedVote, Status};
use overlord::{DurationConfig, Overlord, OverlordHandler};
use parking_lot::RwLock;

use common_crypto::{PrivateKey, Secp256k1PrivateKey};

use protocol::traits::{Consensus, ConsensusAdapter, CurrentConsensusStatus, NodeInfo};
use protocol::types::{Epoch, Proof, SignedTransaction, Validator};
use protocol::ProtocolResult;
use protocol::types::{Address, Hash, Proof, Validator};
use protocol::{codec::ProtocolCodec, ProtocolResult};

use crate::engine::ConsensusEngine;
use crate::fixed_types::{FixedPill, FixedSignedTxs};
use crate::fixed_types::{FixedEpochID, FixedPill, FixedSignedTxs};
use crate::util::OverlordCrypto;
use crate::{ConsensusError, MsgType};

Expand All @@ -26,6 +29,8 @@ pub struct OverlordConsensus<Adapter: ConsensusAdapter + 'static> {
handler: OverlordHandler<FixedPill>,
/// A consensus engine for synchronous.
engine: Arc<ConsensusEngine<Adapter>>,
/// Synchronization lock.
lock: Mutex<()>,
}

#[async_trait]
Expand Down Expand Up @@ -57,13 +62,127 @@ impl<Adapter: ConsensusAdapter + 'static> Consensus for OverlordConsensus<Adapte
Ok(())
}

async fn update_epoch(
&self,
_ctx: Context,
_epoch: Epoch,
_signed_txs: Vec<SignedTransaction>,
_proof: Proof,
) -> ProtocolResult<()> {
async fn update_epoch(&self, ctx: Context, msg: Vec<u8>) -> ProtocolResult<()> {
let sync_lock = self.lock.try_lock();
if sync_lock.is_none() {
// Synchronization is processing.
return Ok(());
}

// Reveive the rich epoch ID.
let epoch_id: FixedEpochID =
deserialize(&msg).map_err(|_| ConsensusError::DecodeErr(MsgType::RichEpochID))?;
let rich_epoch_id = epoch_id.inner - 1;

// TODO: fix to get_epoch_by_epoch_id()
let current_epoch_id = self
.engine
.get_current_epoch_id(ctx.clone())
.await
.unwrap_or(1u64);

if current_epoch_id >= rich_epoch_id - 1 {
return Ok(());
}

// Lock the consensus engine, block commit process.
let commit_lock = self.engine.lock.try_lock();
if commit_lock.is_none() {
return Ok(());
}

info!("self {}, chain {}", current_epoch_id, rich_epoch_id);
info!("consensus: start synchronization");

let mut state_root = Hash::from_empty();
let mut current_hash = if current_epoch_id != 0 {
let mut current_epoch = self
.engine
.get_epoch_by_id(ctx.clone(), current_epoch_id)
.await?;
state_root = current_epoch.header.state_root.clone();
let tmp = Hash::digest(current_epoch.encode().await?);

// Check epoch for the first time.
let epoch_header = self
.engine
.pull_epoch(ctx.clone(), current_epoch_id + 1)
.await?
.header;
self.check_proof(current_epoch_id + 1, epoch_header.proof.clone())?;
if tmp != epoch_header.pre_hash {
return Err(ConsensusError::SyncEpochHashErr(current_epoch_id + 1).into());
}
tmp
} else {
Hash::from_empty()
};

// Start to synchronization.
for id in (current_epoch_id + 1)..=rich_epoch_id {
info!("consensus: start synchronization epoch {}", id);

// First pull a new block.
debug!("consensus: synchronization pull epoch {}", id);
let mut epoch = self.engine.pull_epoch(ctx.clone(), id).await?;

// Check proof and previous hash.
debug!("consensus: synchronization check proof and previous hash");
let proof = epoch.header.proof.clone();
self.check_proof(id, proof.clone())?;
if id != 1 && current_hash != epoch.header.pre_hash {
return Err(ConsensusError::SyncEpochHashErr(id).into());
}
self.engine.save_proof(ctx.clone(), proof.clone()).await?;

// Then pull signed transactions.
debug!("consensus: synchronization pull signed transactions");
let txs = self
.engine
.pull_txs(ctx.clone(), epoch.ordered_tx_hashes.clone())
.await?;

// After get the signed transactions:
// 1. Execute the signed transactions.
// 2. Save the signed transactions.
// 3. Save the latest proof.
// 4. Save the new epoch.
// 5. Save the receipt.
debug!("consensus: synchronization executor the epoch");
let exec_resp = self
.engine
.exec(
state_root.clone(),
epoch.header.epoch_id,
Address::User(epoch.header.proposer.clone()),
txs.clone(),
)
.await?;
state_root = exec_resp.state_root.clone();

debug!("consensus: synchronization update the rich status");
self.engine
.update_status(epoch.header.epoch_id, epoch.clone(), proof, exec_resp, txs)
.await?;

// Update the previous hash and last epoch.
info!("consensus: finish synchronization {} epoch", id);
current_hash = Hash::digest(epoch.encode().await?);
}

debug!(
"consensus: synchronization send overlord rich status {}",
rich_epoch_id
);
let status = Status {
epoch_id: rich_epoch_id + 1,
interval: Some(self.engine.get_current_interval()),
authority_list: self.engine.get_current_authority_list(),
};

self.handler
.send_msg(ctx, OverlordMsg::RichStatus(status))
.map_err(|e| ConsensusError::OverlordErr(Box::new(e)))?;
Ok(())
}
}
Expand Down Expand Up @@ -105,6 +224,7 @@ impl<Adapter: ConsensusAdapter + 'static> OverlordConsensus<Adapter> {
Self {
inner: Arc::new(overlord),
handler: overlord_handler,
lock: Mutex::new(()),
engine,
}
}
Expand All @@ -121,6 +241,10 @@ impl<Adapter: ConsensusAdapter + 'static> OverlordConsensus<Adapter> {

Ok(())
}

fn check_proof(&self, _epoch_id: u64, _proof: Proof) -> ProtocolResult<()> {
Ok(())
}
}

fn gen_overlord_status(epoch_id: u64, interval: u64, validators: Vec<Validator>) -> Status {
Expand Down
Loading

0 comments on commit 97bb21b

Please sign in to comment.