Skip to content

Commit

Permalink
feat(network): tag consensus peers (#6)
Browse files Browse the repository at this point in the history
* feat(network): implement tag api

* feat: tag consensus peers
  • Loading branch information
zeroqn committed Jul 27, 2020
1 parent 9df66d4 commit 4320ab9
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 21 deletions.
21 changes: 15 additions & 6 deletions core/consensus/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use core_network::{PeerId, PeerIdExt};

use protocol::traits::{
CommonConsensusAdapter, ConsensusAdapter, Context, ExecutorFactory, ExecutorParams,
ExecutorResp, Gossip, MemPool, MessageTarget, MixedTxHashes, PeerTrust, Priority, Rpc,
ExecutorResp, Gossip, MemPool, MessageTarget, MixedTxHashes, Network, PeerTrust, Priority, Rpc,
ServiceMapping, Storage, SynchronizationAdapter, TrustFeedback,
};
use protocol::types::{
Expand All @@ -45,7 +45,7 @@ const OVERLORD_GAP: usize = 10;
pub struct OverlordConsensusAdapter<
EF: ExecutorFactory<DB, S, Mapping>,
M: MemPool,
N: Rpc + PeerTrust + Gossip + 'static,
N: Rpc + PeerTrust + Gossip + Network + 'static,
S: Storage,
DB: cita_trie::DB,
Mapping: ServiceMapping,
Expand All @@ -68,7 +68,7 @@ impl<EF, M, N, S, DB, Mapping> ConsensusAdapter
where
EF: ExecutorFactory<DB, S, Mapping>,
M: MemPool + 'static,
N: Rpc + PeerTrust + Gossip + 'static,
N: Rpc + PeerTrust + Gossip + Network + 'static,
S: Storage + 'static,
DB: cita_trie::DB + 'static,
Mapping: ServiceMapping + 'static,
Expand Down Expand Up @@ -220,7 +220,7 @@ impl<EF, M, N, S, DB, Mapping> SynchronizationAdapter
where
EF: ExecutorFactory<DB, S, Mapping>,
M: MemPool + 'static,
N: Rpc + PeerTrust + Gossip + 'static,
N: Rpc + PeerTrust + Gossip + Network + 'static,
S: Storage + 'static,
DB: cita_trie::DB + 'static,
Mapping: ServiceMapping + 'static,
Expand Down Expand Up @@ -354,7 +354,7 @@ impl<EF, M, N, S, DB, Mapping> CommonConsensusAdapter
where
EF: ExecutorFactory<DB, S, Mapping>,
M: MemPool + 'static,
N: Rpc + PeerTrust + Gossip + 'static,
N: Rpc + PeerTrust + Gossip + Network + 'static,
S: Storage + 'static,
DB: cita_trie::DB + 'static,
Mapping: ServiceMapping + 'static,
Expand Down Expand Up @@ -492,6 +492,15 @@ where
Ok(serde_json::from_str(&exec_resp.succeed_data).expect("Decode metadata failed!"))
}

fn tag_consensus(&self, ctx: Context, pub_keys: Vec<Bytes>) -> ProtocolResult<()> {
let peer_ids_bytes = pub_keys
.iter()
.map(|pk| PeerId::from_pubkey_bytes(pk).map(PeerIdExt::into_bytes_ext))
.collect::<Result<_, _>>()?;

self.network.tag_consensus(ctx, peer_ids_bytes)
}

#[muta_apm::derive::tracing_span(kind = "consensus.adapter")]
fn report_bad(&self, ctx: Context, feedback: TrustFeedback) {
self.network.report(ctx, feedback);
Expand Down Expand Up @@ -808,7 +817,7 @@ impl<EF, M, N, S, DB, Mapping> OverlordConsensusAdapter<EF, M, N, S, DB, Mapping
where
EF: ExecutorFactory<DB, S, Mapping>,
M: MemPool + 'static,
N: Rpc + PeerTrust + Gossip + 'static,
N: Rpc + PeerTrust + Gossip + Network + 'static,
S: Storage + 'static,
DB: cita_trie::DB + 'static,
Mapping: ServiceMapping + 'static,
Expand Down
7 changes: 7 additions & 0 deletions core/consensus/src/synchronization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,13 @@ impl<Adapter: SynchronizationAdapter> OverlordSynchronization<Adapter> {
metadata.max_tx_size,
);

let pub_keys = metadata
.verifier_list
.iter()
.map(|v| v.pub_key.decode())
.collect();
self.adapter.tag_consensus(ctx.clone(), pub_keys)?;

log::info!(
"[synchronization]: commit_block, committing block header: {}, committing proof:{:?}",
block.header.clone(),
Expand Down
4 changes: 4 additions & 0 deletions core/consensus/src/tests/synchronization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ impl CommonConsensusAdapter for MockCommonConsensusAdapter {
})
}

fn tag_consensus(&self, _: Context, _: Vec<Bytes>) -> ProtocolResult<()> {
Ok(())
}

fn report_bad(&self, _ctx: Context, _feedback: TrustFeedback) {}

fn set_args(
Expand Down
57 changes: 53 additions & 4 deletions core/network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,19 @@ impl Deref for ArcSession {
}

struct Inner {
sessions: RwLock<HashSet<ArcSession>>,
peers: RwLock<HashSet<ArcPeer>>,
consensus: RwLock<HashSet<PeerId>>,
sessions: RwLock<HashSet<ArcSession>>,
peers: RwLock<HashSet<ArcPeer>>,

listen: RwLock<HashSet<PeerMultiaddr>>,
}

impl Inner {
pub fn new() -> Self {
Inner {
sessions: Default::default(),
peers: Default::default(),
consensus: Default::default(),
sessions: Default::default(),
peers: Default::default(),

listen: Default::default(),
}
Expand Down Expand Up @@ -427,6 +429,53 @@ impl PeerManagerHandle {

listen.into_iter().map(sanitize).collect()
}

pub fn tag(&self, peer_id: &PeerId, tag: PeerTag) -> Result<(), NetworkError> {
let consensus_tag = tag == PeerTag::Consensus;

if let Some(peer) = self.inner.peer(peer_id) {
peer.tags.insert(tag)?;
} else {
let peer = ArcPeer::new(peer_id.to_owned());
peer.tags.insert(tag)?;
self.inner.add_peer(peer);
}

if consensus_tag {
self.inner.consensus.write().insert(peer_id.to_owned());
}

Ok(())
}

pub fn untag(&self, peer_id: &PeerId, tag: &PeerTag) {
if let Some(peer) = self.inner.peer(peer_id) {
peer.tags.remove(tag);
}

if tag == &PeerTag::Consensus {
self.inner.consensus.write().remove(peer_id);
}
}

pub fn tag_consensus(&self, peer_ids: Vec<PeerId>) {
{
for peer_id in self.inner.consensus.read().iter() {
if let Some(peer) = self.inner.peer(peer_id) {
peer.tags.remove(&PeerTag::Consensus)
}
}
}

for peer_id in peer_ids.iter() {
let _ = self.tag(peer_id, PeerTag::Consensus);
}

{
let id_set = HashSet::from_iter(peer_ids);
*self.inner.consensus.write() = id_set;
}
}
}

pub struct PeerManager {
Expand Down
46 changes: 46 additions & 0 deletions core/network/src/peer_manager/test_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2739,3 +2739,49 @@ async fn should_setup_trust_metric_if_none_on_session_blocked() {
"should have 1 bad event"
);
}

#[tokio::test]
async fn should_able_to_tag_peer() {
let (mgr, _conn_rx) = make_manager(0, 20);
let handle = mgr.inner.handle();

let peer = make_peer(2077);
handle.tag(&peer.id, PeerTag::Consensus).unwrap();

let peer = mgr.core_inner().peer(&peer.id).unwrap();
assert!(peer.tags.contains(&PeerTag::Consensus));
}

#[tokio::test]
async fn should_able_to_untag_peer() {
let (mgr, _conn_rx) = make_manager(0, 20);
let handle = mgr.inner.handle();

let peer = make_peer(2077);
handle.tag(&peer.id, PeerTag::Consensus).unwrap();

let peer = mgr.core_inner().peer(&peer.id).unwrap();
assert!(peer.tags.contains(&PeerTag::Consensus));

handle.untag(&peer.id, &PeerTag::Consensus);
assert!(!peer.tags.contains(&PeerTag::Consensus));
}

#[tokio::test]
async fn should_remove_old_consensus_peer_tag_when_tag_consensus() {
let (mgr, _conn_rx) = make_manager(0, 20);
let handle = mgr.inner.handle();

let peer = make_peer(2077);
handle.tag(&peer.id, PeerTag::Consensus).unwrap();

let peer = mgr.core_inner().peer(&peer.id).unwrap();
assert!(peer.tags.contains(&PeerTag::Consensus));

let new_consensus = make_peer(3077);
handle.tag_consensus(vec![new_consensus.owned_id()]);

let new_consensus = mgr.core_inner().peer(&new_consensus.id).unwrap();
assert!(new_consensus.tags.contains(&PeerTag::Consensus));
assert!(!peer.tags.contains(&PeerTag::Consensus));
}
45 changes: 38 additions & 7 deletions core/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use futures::{
use log::{debug, error, info};
use protocol::{
traits::{
Context, Gossip, MessageCodec, MessageHandler, PeerTrust, Priority, Rpc, TrustFeedback,
Context, Gossip, MessageCodec, MessageHandler, Network, PeerTag, PeerTrust, Priority, Rpc,
TrustFeedback,
},
Bytes, ProtocolResult,
};
Expand All @@ -36,20 +37,21 @@ use crate::{
message::RawSessionMessage,
metrics::Metrics,
outbound::{NetworkGossip, NetworkRpc},
peer_manager::{PeerManager, PeerManagerConfig, SharedSessions},
peer_manager::{PeerManager, PeerManagerConfig, PeerManagerHandle, SharedSessions},
protocols::CoreProtocol,
reactor::{MessageRouter, Reactor},
rpc_map::RpcMap,
selfcheck::SelfCheck,
traits::NetworkContext,
NetworkConfig,
NetworkConfig, PeerIdExt,
};

#[derive(Clone)]
pub struct NetworkServiceHandle {
gossip: NetworkGossip<ConnectionServiceControl<CoreProtocol, SharedSessions>, Snappy>,
rpc: NetworkRpc<ConnectionServiceControl<CoreProtocol, SharedSessions>, Snappy>,
peer_trust: UnboundedSender<PeerManagerEvent>,
peer_state: PeerManagerHandle,

#[cfg(feature = "diagnostic")]
pub diagnostic: Diagnostic,
Expand Down Expand Up @@ -128,6 +130,32 @@ impl PeerTrust for NetworkServiceHandle {
}
}

impl Network for NetworkServiceHandle {
fn tag(&self, _: Context, peer_id: Bytes, tag: PeerTag) -> ProtocolResult<()> {
let peer_id = <PeerId as PeerIdExt>::from_bytes(peer_id)?;
self.peer_state.tag(&peer_id, tag)?;

Ok(())
}

fn untag(&self, _: Context, peer_id: Bytes, tag: &PeerTag) -> ProtocolResult<()> {
let peer_id = <PeerId as PeerIdExt>::from_bytes(peer_id)?;
self.peer_state.untag(&peer_id, tag);

Ok(())
}

fn tag_consensus(&self, _: Context, peer_ids: Vec<Bytes>) -> ProtocolResult<()> {
let peer_ids = peer_ids
.into_iter()
.map(PeerId::from_pubkey_bytes)
.collect::<Result<Vec<_>, _>>()?;
self.peer_state.tag_consensus(peer_ids);

Ok(())
}
}

enum NetworkConnectionService {
NoListen(ConnectionService<CoreProtocol>), // no listen address yet
Ready(ConnectionService<CoreProtocol>),
Expand All @@ -152,9 +180,10 @@ pub struct NetworkService {
rpc_map: Arc<RpcMap>,

// Core service
net_conn_srv: Option<NetworkConnectionService>,
peer_mgr: Option<PeerManager>,
router: Option<MessageRouter<Snappy, SharedSessions>>,
net_conn_srv: Option<NetworkConnectionService>,
peer_mgr: Option<PeerManager>,
peer_mgr_handle: PeerManagerHandle,
router: Option<MessageRouter<Snappy, SharedSessions>>,

// Metrics
metrics: Option<Metrics<SharedSessions>>,
Expand Down Expand Up @@ -204,7 +233,7 @@ impl NetworkService {
let proto = CoreProtocol::build()
.ping(config.ping_interval, config.ping_timeout, mgr_tx.clone())
.identify(peer_mgr_handle.clone(), mgr_tx.clone())
.discovery(peer_mgr_handle, mgr_tx.clone(), disc_sync_interval)
.discovery(peer_mgr_handle.clone(), mgr_tx.clone(), disc_sync_interval)
.transmitter(raw_msg_tx.clone())
.build();

Expand Down Expand Up @@ -249,6 +278,7 @@ impl NetworkService {

net_conn_srv: Some(NetworkConnectionService::NoListen(conn_srv)),
peer_mgr: Some(peer_mgr),
peer_mgr_handle,
router: Some(router),

metrics: Some(metrics),
Expand Down Expand Up @@ -322,6 +352,7 @@ impl NetworkService {
gossip: self.gossip.clone(),
rpc: self.rpc.clone(),
peer_trust: self.mgr_tx.clone(),
peer_state: self.peer_mgr_handle.clone(),

#[cfg(feature = "diagnostic")]
diagnostic: self.diagnostic.clone(),
Expand Down
2 changes: 2 additions & 0 deletions protocol/src/traits/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ pub trait CommonConsensusAdapter: Send + Sync {
proposer: Address,
) -> ProtocolResult<Metadata>;

fn tag_consensus(&self, ctx: Context, peer_ids: Vec<Bytes>) -> ProtocolResult<()>;

fn report_bad(&self, ctx: Context, feedback: TrustFeedback);

fn set_args(&self, context: Context, timeout_gap: u64, cycles_limit: u64, max_tx_size: u64);
Expand Down
5 changes: 3 additions & 2 deletions protocol/src/traits/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ pub trait Rpc: Send + Sync {
}

pub trait Network: Send + Sync {
fn add_tag(&self, ctx: Context, peer_id: Bytes, tag: PeerTag) -> ProtocolResult<()>;
fn remove_tag(&self, ctx: Context, peer_id: Bytes, tag: &PeerTag) -> ProtocolResult<()>;
fn tag(&self, ctx: Context, peer_id: Bytes, tag: PeerTag) -> ProtocolResult<()>;
fn untag(&self, ctx: Context, peer_id: Bytes, tag: &PeerTag) -> ProtocolResult<()>;
fn tag_consensus(&self, ctx: Context, peer_ids: Vec<Bytes>) -> ProtocolResult<()>;
}

pub trait PeerTrust: Send + Sync {
Expand Down
12 changes: 11 additions & 1 deletion src/default_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use core_network::{NetworkConfig, NetworkService};
use core_storage::{adapter::rocks::RocksAdapter, ImplStorage, StorageError};
use framework::binding::state::RocksTrieDB;
use framework::executor::{ServiceExecutor, ServiceExecutorFactory};
use protocol::traits::{APIAdapter, Context, MemPool, NodeInfo, ServiceMapping, Storage};
use protocol::traits::{APIAdapter, Context, MemPool, Network, NodeInfo, ServiceMapping, Storage};
use protocol::types::{Address, Block, BlockHeader, Genesis, Hash, Metadata, Proof, Validator};
use protocol::{fixed_codec::FixedCodec, ProtocolResult};

Expand Down Expand Up @@ -399,6 +399,16 @@ pub async fn start<Mapping: 'static + ServiceMapping>(
lock,
));

let pub_keys = metadata
.verifier_list
.iter()
.map(|v| v.pub_key.decode())
.collect();

network_service
.handle()
.tag_consensus(Context::new(), pub_keys)?;

// Re-execute block from exec_height + 1 to current_height, so that init the
// lost current status.
log::info!("Re-execute from {} to {}", exec_height + 1, current_height);
Expand Down
Loading

0 comments on commit 4320ab9

Please sign in to comment.