Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(network): tag consensus peers #6

Merged
merged 2 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions core/consensus/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use core_network::{PeerId, PeerIdExt};

use protocol::traits::{
CommonConsensusAdapter, ConsensusAdapter, Context, ExecutorFactory, ExecutorParams,
ExecutorResp, Gossip, MemPool, MessageTarget, MixedTxHashes, PeerTrust, Priority, Rpc,
ExecutorResp, Gossip, MemPool, MessageTarget, MixedTxHashes, Network, PeerTrust, Priority, Rpc,
ServiceMapping, Storage, SynchronizationAdapter, TrustFeedback,
};
use protocol::types::{
Expand All @@ -45,7 +45,7 @@ const OVERLORD_GAP: usize = 10;
pub struct OverlordConsensusAdapter<
EF: ExecutorFactory<DB, S, Mapping>,
M: MemPool,
N: Rpc + PeerTrust + Gossip + 'static,
N: Rpc + PeerTrust + Gossip + Network + 'static,
S: Storage,
DB: cita_trie::DB,
Mapping: ServiceMapping,
Expand All @@ -68,7 +68,7 @@ impl<EF, M, N, S, DB, Mapping> ConsensusAdapter
where
EF: ExecutorFactory<DB, S, Mapping>,
M: MemPool + 'static,
N: Rpc + PeerTrust + Gossip + 'static,
N: Rpc + PeerTrust + Gossip + Network + 'static,
S: Storage + 'static,
DB: cita_trie::DB + 'static,
Mapping: ServiceMapping + 'static,
Expand Down Expand Up @@ -220,7 +220,7 @@ impl<EF, M, N, S, DB, Mapping> SynchronizationAdapter
where
EF: ExecutorFactory<DB, S, Mapping>,
M: MemPool + 'static,
N: Rpc + PeerTrust + Gossip + 'static,
N: Rpc + PeerTrust + Gossip + Network + 'static,
S: Storage + 'static,
DB: cita_trie::DB + 'static,
Mapping: ServiceMapping + 'static,
Expand Down Expand Up @@ -354,7 +354,7 @@ impl<EF, M, N, S, DB, Mapping> CommonConsensusAdapter
where
EF: ExecutorFactory<DB, S, Mapping>,
M: MemPool + 'static,
N: Rpc + PeerTrust + Gossip + 'static,
N: Rpc + PeerTrust + Gossip + Network + 'static,
S: Storage + 'static,
DB: cita_trie::DB + 'static,
Mapping: ServiceMapping + 'static,
Expand Down Expand Up @@ -492,6 +492,15 @@ where
Ok(serde_json::from_str(&exec_resp.succeed_data).expect("Decode metadata failed!"))
}

fn tag_consensus(&self, ctx: Context, pub_keys: Vec<Bytes>) -> ProtocolResult<()> {
let peer_ids_bytes = pub_keys
.iter()
.map(|pk| PeerId::from_pubkey_bytes(pk).map(PeerIdExt::into_bytes_ext))
.collect::<Result<_, _>>()?;

self.network.tag_consensus(ctx, peer_ids_bytes)
}

#[muta_apm::derive::tracing_span(kind = "consensus.adapter")]
fn report_bad(&self, ctx: Context, feedback: TrustFeedback) {
self.network.report(ctx, feedback);
Expand Down Expand Up @@ -808,7 +817,7 @@ impl<EF, M, N, S, DB, Mapping> OverlordConsensusAdapter<EF, M, N, S, DB, Mapping
where
EF: ExecutorFactory<DB, S, Mapping>,
M: MemPool + 'static,
N: Rpc + PeerTrust + Gossip + 'static,
N: Rpc + PeerTrust + Gossip + Network + 'static,
S: Storage + 'static,
DB: cita_trie::DB + 'static,
Mapping: ServiceMapping + 'static,
Expand Down
7 changes: 7 additions & 0 deletions core/consensus/src/synchronization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,13 @@ impl<Adapter: SynchronizationAdapter> OverlordSynchronization<Adapter> {
metadata.max_tx_size,
);

let pub_keys = metadata
.verifier_list
.iter()
.map(|v| v.pub_key.decode())
.collect();
self.adapter.tag_consensus(ctx.clone(), pub_keys)?;

log::info!(
"[synchronization]: commit_block, committing block header: {}, committing proof:{:?}",
block.header.clone(),
Expand Down
4 changes: 4 additions & 0 deletions core/consensus/src/tests/synchronization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ impl CommonConsensusAdapter for MockCommonConsensusAdapter {
})
}

fn tag_consensus(&self, _: Context, _: Vec<Bytes>) -> ProtocolResult<()> {
Ok(())
}

fn report_bad(&self, _ctx: Context, _feedback: TrustFeedback) {}

fn set_args(
Expand Down
57 changes: 53 additions & 4 deletions core/network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,19 @@ impl Deref for ArcSession {
}

struct Inner {
sessions: RwLock<HashSet<ArcSession>>,
peers: RwLock<HashSet<ArcPeer>>,
consensus: RwLock<HashSet<PeerId>>,
sessions: RwLock<HashSet<ArcSession>>,
peers: RwLock<HashSet<ArcPeer>>,

listen: RwLock<HashSet<PeerMultiaddr>>,
}

impl Inner {
pub fn new() -> Self {
Inner {
sessions: Default::default(),
peers: Default::default(),
consensus: Default::default(),
sessions: Default::default(),
peers: Default::default(),

listen: Default::default(),
}
Expand Down Expand Up @@ -427,6 +429,53 @@ impl PeerManagerHandle {

listen.into_iter().map(sanitize).collect()
}

pub fn tag(&self, peer_id: &PeerId, tag: PeerTag) -> Result<(), NetworkError> {
let consensus_tag = tag == PeerTag::Consensus;

if let Some(peer) = self.inner.peer(peer_id) {
peer.tags.insert(tag)?;
} else {
let peer = ArcPeer::new(peer_id.to_owned());
peer.tags.insert(tag)?;
self.inner.add_peer(peer);
}

if consensus_tag {
self.inner.consensus.write().insert(peer_id.to_owned());
}

Ok(())
}

pub fn untag(&self, peer_id: &PeerId, tag: &PeerTag) {
if let Some(peer) = self.inner.peer(peer_id) {
peer.tags.remove(tag);
}

if tag == &PeerTag::Consensus {
self.inner.consensus.write().remove(peer_id);
}
}

pub fn tag_consensus(&self, peer_ids: Vec<PeerId>) {
{
for peer_id in self.inner.consensus.read().iter() {
if let Some(peer) = self.inner.peer(peer_id) {
peer.tags.remove(&PeerTag::Consensus)
}
}
}

for peer_id in peer_ids.iter() {
let _ = self.tag(peer_id, PeerTag::Consensus);
}

{
let id_set = HashSet::from_iter(peer_ids);
*self.inner.consensus.write() = id_set;
}
}
}

pub struct PeerManager {
Expand Down
46 changes: 46 additions & 0 deletions core/network/src/peer_manager/test_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2739,3 +2739,49 @@ async fn should_setup_trust_metric_if_none_on_session_blocked() {
"should have 1 bad event"
);
}

#[tokio::test]
async fn should_able_to_tag_peer() {
let (mgr, _conn_rx) = make_manager(0, 20);
let handle = mgr.inner.handle();

let peer = make_peer(2077);
handle.tag(&peer.id, PeerTag::Consensus).unwrap();

let peer = mgr.core_inner().peer(&peer.id).unwrap();
assert!(peer.tags.contains(&PeerTag::Consensus));
}

#[tokio::test]
async fn should_able_to_untag_peer() {
let (mgr, _conn_rx) = make_manager(0, 20);
let handle = mgr.inner.handle();

let peer = make_peer(2077);
handle.tag(&peer.id, PeerTag::Consensus).unwrap();

let peer = mgr.core_inner().peer(&peer.id).unwrap();
assert!(peer.tags.contains(&PeerTag::Consensus));

handle.untag(&peer.id, &PeerTag::Consensus);
assert!(!peer.tags.contains(&PeerTag::Consensus));
}

#[tokio::test]
async fn should_remove_old_consensus_peer_tag_when_tag_consensus() {
let (mgr, _conn_rx) = make_manager(0, 20);
let handle = mgr.inner.handle();

let peer = make_peer(2077);
handle.tag(&peer.id, PeerTag::Consensus).unwrap();

let peer = mgr.core_inner().peer(&peer.id).unwrap();
assert!(peer.tags.contains(&PeerTag::Consensus));

let new_consensus = make_peer(3077);
handle.tag_consensus(vec![new_consensus.owned_id()]);

let new_consensus = mgr.core_inner().peer(&new_consensus.id).unwrap();
assert!(new_consensus.tags.contains(&PeerTag::Consensus));
assert!(!peer.tags.contains(&PeerTag::Consensus));
}
45 changes: 38 additions & 7 deletions core/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use futures::{
use log::{debug, error, info};
use protocol::{
traits::{
Context, Gossip, MessageCodec, MessageHandler, PeerTrust, Priority, Rpc, TrustFeedback,
Context, Gossip, MessageCodec, MessageHandler, Network, PeerTag, PeerTrust, Priority, Rpc,
TrustFeedback,
},
Bytes, ProtocolResult,
};
Expand All @@ -36,20 +37,21 @@ use crate::{
message::RawSessionMessage,
metrics::Metrics,
outbound::{NetworkGossip, NetworkRpc},
peer_manager::{PeerManager, PeerManagerConfig, SharedSessions},
peer_manager::{PeerManager, PeerManagerConfig, PeerManagerHandle, SharedSessions},
protocols::CoreProtocol,
reactor::{MessageRouter, Reactor},
rpc_map::RpcMap,
selfcheck::SelfCheck,
traits::NetworkContext,
NetworkConfig,
NetworkConfig, PeerIdExt,
};

#[derive(Clone)]
pub struct NetworkServiceHandle {
gossip: NetworkGossip<ConnectionServiceControl<CoreProtocol, SharedSessions>, Snappy>,
rpc: NetworkRpc<ConnectionServiceControl<CoreProtocol, SharedSessions>, Snappy>,
peer_trust: UnboundedSender<PeerManagerEvent>,
peer_state: PeerManagerHandle,

#[cfg(feature = "diagnostic")]
pub diagnostic: Diagnostic,
Expand Down Expand Up @@ -128,6 +130,32 @@ impl PeerTrust for NetworkServiceHandle {
}
}

impl Network for NetworkServiceHandle {
fn tag(&self, _: Context, peer_id: Bytes, tag: PeerTag) -> ProtocolResult<()> {
let peer_id = <PeerId as PeerIdExt>::from_bytes(peer_id)?;
self.peer_state.tag(&peer_id, tag)?;

Ok(())
}

fn untag(&self, _: Context, peer_id: Bytes, tag: &PeerTag) -> ProtocolResult<()> {
let peer_id = <PeerId as PeerIdExt>::from_bytes(peer_id)?;
self.peer_state.untag(&peer_id, tag);

Ok(())
}

fn tag_consensus(&self, _: Context, peer_ids: Vec<Bytes>) -> ProtocolResult<()> {
let peer_ids = peer_ids
.into_iter()
.map(<PeerId as PeerIdExt>::from_bytes)
.collect::<Result<Vec<_>, _>>()?;
self.peer_state.tag_consensus(peer_ids);

Ok(())
}
}

enum NetworkConnectionService {
NoListen(ConnectionService<CoreProtocol>), // no listen address yet
Ready(ConnectionService<CoreProtocol>),
Expand All @@ -152,9 +180,10 @@ pub struct NetworkService {
rpc_map: Arc<RpcMap>,

// Core service
net_conn_srv: Option<NetworkConnectionService>,
peer_mgr: Option<PeerManager>,
router: Option<MessageRouter<Snappy, SharedSessions>>,
net_conn_srv: Option<NetworkConnectionService>,
peer_mgr: Option<PeerManager>,
peer_mgr_handle: PeerManagerHandle,
router: Option<MessageRouter<Snappy, SharedSessions>>,

// Metrics
metrics: Option<Metrics<SharedSessions>>,
Expand Down Expand Up @@ -204,7 +233,7 @@ impl NetworkService {
let proto = CoreProtocol::build()
.ping(config.ping_interval, config.ping_timeout, mgr_tx.clone())
.identify(peer_mgr_handle.clone(), mgr_tx.clone())
.discovery(peer_mgr_handle, mgr_tx.clone(), disc_sync_interval)
.discovery(peer_mgr_handle.clone(), mgr_tx.clone(), disc_sync_interval)
.transmitter(raw_msg_tx.clone())
.build();

Expand Down Expand Up @@ -249,6 +278,7 @@ impl NetworkService {

net_conn_srv: Some(NetworkConnectionService::NoListen(conn_srv)),
peer_mgr: Some(peer_mgr),
peer_mgr_handle,
router: Some(router),

metrics: Some(metrics),
Expand Down Expand Up @@ -322,6 +352,7 @@ impl NetworkService {
gossip: self.gossip.clone(),
rpc: self.rpc.clone(),
peer_trust: self.mgr_tx.clone(),
peer_state: self.peer_mgr_handle.clone(),

#[cfg(feature = "diagnostic")]
diagnostic: self.diagnostic.clone(),
Expand Down
2 changes: 2 additions & 0 deletions protocol/src/traits/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ pub trait CommonConsensusAdapter: Send + Sync {
proposer: Address,
) -> ProtocolResult<Metadata>;

fn tag_consensus(&self, ctx: Context, peer_ids: Vec<Bytes>) -> ProtocolResult<()>;

fn report_bad(&self, ctx: Context, feedback: TrustFeedback);

fn set_args(&self, context: Context, timeout_gap: u64, cycles_limit: u64, max_tx_size: u64);
Expand Down
5 changes: 3 additions & 2 deletions protocol/src/traits/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ pub trait Rpc: Send + Sync {
}

pub trait Network: Send + Sync {
fn add_tag(&self, ctx: Context, peer_id: Bytes, tag: PeerTag) -> ProtocolResult<()>;
fn remove_tag(&self, ctx: Context, peer_id: Bytes, tag: &PeerTag) -> ProtocolResult<()>;
fn tag(&self, ctx: Context, peer_id: Bytes, tag: PeerTag) -> ProtocolResult<()>;
fn untag(&self, ctx: Context, peer_id: Bytes, tag: &PeerTag) -> ProtocolResult<()>;
fn tag_consensus(&self, ctx: Context, peer_ids: Vec<Bytes>) -> ProtocolResult<()>;
}

pub trait PeerTrust: Send + Sync {
Expand Down
12 changes: 11 additions & 1 deletion src/default_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use core_network::{NetworkConfig, NetworkService};
use core_storage::{adapter::rocks::RocksAdapter, ImplStorage, StorageError};
use framework::binding::state::RocksTrieDB;
use framework::executor::{ServiceExecutor, ServiceExecutorFactory};
use protocol::traits::{APIAdapter, Context, MemPool, NodeInfo, ServiceMapping, Storage};
use protocol::traits::{APIAdapter, Context, MemPool, Network, NodeInfo, ServiceMapping, Storage};
use protocol::types::{Address, Block, BlockHeader, Genesis, Hash, Metadata, Proof, Validator};
use protocol::{fixed_codec::FixedCodec, ProtocolResult};

Expand Down Expand Up @@ -399,6 +399,16 @@ pub async fn start<Mapping: 'static + ServiceMapping>(
lock,
));

let pub_keys = metadata
.verifier_list
.iter()
.map(|v| v.pub_key.decode())
.collect();

network_service
.handle()
.tag_consensus(Context::new(), pub_keys)?;

// Re-execute block from exec_height + 1 to current_height, so that init the
// lost current status.
log::info!("Re-execute from {} to {}", exec_height + 1, current_height);
Expand Down
Loading