Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

Commit

Permalink
feat(mempool): integrate trust metric (#245)
Browse files Browse the repository at this point in the history
* add trust metric in mempool

* change(mempool): remove outdated FIXME

* fix(mempool): no peer id in context for trust metric feedback

* fix(mempool): rebase master

* ci: force travis pull

Co-authored-by: wancencen <[email protected]>
  • Loading branch information
zeroqn and rev-chaos authored May 6, 2020
1 parent 3dd6bc1 commit 49474fd
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 7 deletions.
1 change: 1 addition & 0 deletions core/mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ protocol = { path = "../../protocol", package = "muta-protocol" }
common-crypto = { path = "../../common/crypto" }
core-network = { path = "../network" }


futures = { version = "0.3", features = [ "async-await" ] }
crossbeam-queue = "0.2"
derive_more = "0.99"
Expand Down
2 changes: 0 additions & 2 deletions core/mempool/src/adapter/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ where
log::error!("[core_mempool] mempool batch insert error");
}

// FIXME
TrustFeedback::Neutral
}
}
Expand Down Expand Up @@ -122,7 +121,6 @@ where
.unwrap_or_else(move |err| log::warn!("[core_mempool] push txs {}", err))
.await;

// FIXME
TrustFeedback::Neutral
}
}
76 changes: 71 additions & 5 deletions core/mempool/src/adapter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use super::TxContext;

pub mod message;

use std::{
Expand All @@ -24,7 +26,7 @@ use log::{debug, error};
use common_crypto::Crypto;
use protocol::{
fixed_codec::FixedCodec,
traits::{Context, Gossip, MemPoolAdapter, Priority, Rpc, Storage},
traits::{Context, Gossip, MemPoolAdapter, PeerTrust, Priority, Rpc, Storage, TrustFeedback},
types::{Hash, SignedTransaction},
ProtocolError, ProtocolErrorKind, ProtocolResult,
};
Expand Down Expand Up @@ -147,7 +149,7 @@ pub struct DefaultMemPoolAdapter<C, N, S> {
impl<C, N, S> DefaultMemPoolAdapter<C, N, S>
where
C: Crypto,
N: Rpc + Gossip + Clone + Unpin + 'static,
N: Rpc + PeerTrust + Gossip + Clone + Unpin + 'static,
S: Storage,
{
pub fn new(
Expand Down Expand Up @@ -193,7 +195,7 @@ where
impl<C, N, S> MemPoolAdapter for DefaultMemPoolAdapter<C, N, S>
where
C: Crypto + Send + Sync + 'static,
N: Rpc + Gossip + Clone + Unpin + 'static,
N: Rpc + PeerTrust + Gossip + Clone + Unpin + 'static,
S: Storage + 'static,
{
async fn pull_txs(
Expand Down Expand Up @@ -227,13 +229,25 @@ where
Ok(())
}

async fn check_signature(&self, _ctx: Context, tx: SignedTransaction) -> ProtocolResult<()> {
async fn check_signature(&self, ctx: Context, tx: SignedTransaction) -> ProtocolResult<()> {
let network = self.network.clone();

let blocking_res = tokio::task::spawn_blocking(move || {
// Verify transaction hash
let fixed_bytes = tx.raw.encode_fixed()?;
let tx_hash = Hash::digest(fixed_bytes);

if tx_hash != tx.tx_hash {
if ctx.is_network_origin_txs() {
network.report(
ctx,
TrustFeedback::Worse(format!(
"Mempool wrong tx_hash of tx {:?}",
tx.tx_hash
)),
);
}

let wrong_hash = MemPoolError::CheckHash {
expect: tx.tx_hash,
actual: tx_hash,
Expand All @@ -247,6 +261,16 @@ where
let sig = tx.signature.as_ref();

C::verify_signature(hash.as_ref(), sig, pub_key).map_err(|_| {
if ctx.is_network_origin_txs() {
network.report(
ctx,
TrustFeedback::Worse(format!(
"Mempool wrong signature of tx {:?}",
tx.tx_hash
)),
);
}

MemPoolError::CheckSig {
tx_hash: tx.tx_hash,
}
Expand All @@ -267,14 +291,23 @@ where
// TODO: Verify Fee?
// TODO: Verify Nonce?
// TODO: Cycle limit?
async fn check_transaction(&self, _ctx: Context, stx: SignedTransaction) -> ProtocolResult<()> {
async fn check_transaction(&self, ctx: Context, stx: SignedTransaction) -> ProtocolResult<()> {
let fixed_bytes = stx.raw.encode_fixed()?;
let size = fixed_bytes.len() as u64;
let tx_hash = stx.tx_hash.clone();

// check tx size
let max_tx_size = self.max_tx_size.load(Ordering::SeqCst);
if size > max_tx_size {
if ctx.is_network_origin_txs() {
self.network.report(
ctx,
TrustFeedback::Bad(format!(
"Mempool exceed size limit of tx {:?}",
stx.tx_hash
)),
);
}
return Err(MemPoolError::ExceedSizeLimit {
tx_hash,
max_tx_size,
Expand All @@ -287,6 +320,15 @@ where
let cycles_limit_config = self.cycles_limit.load(Ordering::SeqCst);
let cycles_limit_tx = stx.raw.cycles_limit;
if cycles_limit_tx > cycles_limit_config {
if ctx.is_network_origin_txs() {
self.network.report(
ctx,
TrustFeedback::Bad(format!(
"Mempool exceed cycle limit of tx {:?}",
stx.tx_hash
)),
);
}
return Err(MemPoolError::ExceedCyclesLimit {
tx_hash,
cycles_limit_tx,
Expand All @@ -298,6 +340,12 @@ where
// Verify chain id
let latest_block = self.storage.get_latest_block().await?;
if latest_block.header.chain_id != stx.raw.chain_id {
if ctx.is_network_origin_txs() {
self.network.report(
ctx,
TrustFeedback::Worse(format!("Mempool wrong chain of tx {:?}", stx.tx_hash)),
);
}
let wrong_chain_id = MemPoolError::WrongChain {
tx_hash: stx.tx_hash,
};
Expand All @@ -310,6 +358,12 @@ where
let timeout_gap = self.timeout_gap.load(Ordering::SeqCst);

if stx.raw.timeout > latest_height + timeout_gap {
if ctx.is_network_origin_txs() {
self.network.report(
ctx,
TrustFeedback::Bad(format!("Mempool invalid timeout of tx {:?}", stx.tx_hash)),
);
}
let invalid_timeout = MemPoolError::InvalidTimeout {
tx_hash: stx.tx_hash,
};
Expand All @@ -318,6 +372,12 @@ where
}

if stx.raw.timeout < latest_height {
if ctx.is_network_origin_txs() {
self.network.report(
ctx,
TrustFeedback::Bad(format!("Mempool timeout of tx {:?}", stx.tx_hash)),
);
}
let timeout = MemPoolError::Timeout {
tx_hash: stx.tx_hash,
timeout: stx.raw.timeout,
Expand Down Expand Up @@ -348,6 +408,12 @@ where
Ok(height)
}

fn report_good(&self, ctx: Context) {
if ctx.is_network_origin_txs() {
self.network.report(ctx, TrustFeedback::Good);
}
}

fn set_args(&self, timeout_gap: u64, cycles_limit: u64, max_tx_size: u64) {
self.timeout_gap.store(timeout_gap, Ordering::Relaxed);
self.cycles_limit.store(cycles_limit, Ordering::Relaxed);
Expand Down
4 changes: 4 additions & 0 deletions core/mempool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ where

if !ctx.is_network_origin_txs() {
self.adapter.broadcast_tx(ctx, tx).await?;
} else {
self.adapter.report_good(ctx);
}

Ok(())
Expand Down Expand Up @@ -265,6 +267,8 @@ where
.insert(signed_tx.tx_hash.clone(), signed_tx)
.await;
}

self.adapter.report_good(ctx);
}

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions core/mempool/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ impl MemPoolAdapter for HashMemPoolAdapter {
Ok(CURRENT_HEIGHT)
}

fn report_good(&self, _ctx: Context) {}

fn set_args(&self, _timeout_gap: u64, _cycles_limit: u64, _max_tx_size: u64) {}
}

Expand Down
2 changes: 2 additions & 0 deletions protocol/src/traits/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,7 @@ pub trait MemPoolAdapter: Send + Sync {

async fn get_latest_height(&self, ctx: Context) -> ProtocolResult<u64>;

fn report_good(&self, ctx: Context);

fn set_args(&self, timeout_gap: u64, cycles_limit: u64, max_tx_size: u64);
}

0 comments on commit 49474fd

Please sign in to comment.