Skip to content

Commit

Permalink
feat: add network protocol config
Browse files Browse the repository at this point in the history
  • Loading branch information
quake committed Apr 5, 2021
1 parent c0e8a2b commit 776d300
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 77 deletions.
115 changes: 67 additions & 48 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::services::{
protocol_type_checker::ProtocolTypeCheckerService,
};
use crate::{Behaviour, CKBProtocol, Peer, PeerIndex, ProtocolId, PublicKey, ServiceControl};
use ckb_app_config::NetworkConfig;
use ckb_app_config::{NetworkConfig, SupportProtocol};
use ckb_logger::{debug, error, info, trace, warn};
use ckb_spawn::Spawn;
use ckb_stop_handler::{SignalSender, StopHandler};
Expand Down Expand Up @@ -776,7 +776,7 @@ impl<T: ExitHandler> ServiceHandle for EventHandler<T> {
pub struct NetworkService<T> {
p2p_service: Service<EventHandler<T>>,
network_state: Arc<NetworkState>,
ping_controller: Sender<()>,
ping_controller: Option<Sender<()>>,
// Background services
bg_services: Vec<Pin<Box<dyn Future<Output = ()> + 'static + Send>>>,
version: String,
Expand All @@ -793,60 +793,78 @@ impl<T: ExitHandler> NetworkService<T> {
exit_handler: T,
) -> Self {
let config = &network_state.config;
// == Build special protocols

// TODO: how to deny banned node to open those protocols?
// Ping protocol
let ping_interval = Duration::from_secs(config.ping_interval_secs);
let ping_timeout = Duration::from_secs(config.ping_timeout_secs);

let ping_network_state = Arc::clone(&network_state);
let (ping_handler, ping_controller) =
PingHandler::new(ping_interval, ping_timeout, ping_network_state);
let ping_meta = SupportProtocols::Ping.build_meta_with_service_handle(move || {
ProtocolHandle::Callback(Box::new(ping_handler))
});
// == Build p2p service struct
let mut protocol_metas = protocols
.into_iter()
.map(CKBProtocol::build)
.collect::<Vec<_>>();

// Discovery protocol
let addr_mgr = DiscoveryAddressManager {
network_state: Arc::clone(&network_state),
discovery_local_address: config.discovery_local_address,
};
let disc_meta = SupportProtocols::Discovery.build_meta_with_service_handle(move || {
ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(addr_mgr)))
});
// == Build special protocols

// Identify protocol
// Identify is a core protocol, user cannot disable it via config
let identify_callback =
IdentifyCallback::new(Arc::clone(&network_state), name, version.clone());
let identify_meta = SupportProtocols::Identify.build_meta_with_service_handle(move || {
ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(identify_callback)))
});
protocol_metas.push(identify_meta);

// Feeler protocol
let feeler_meta = SupportProtocols::Feeler.build_meta_with_service_handle({
let network_state = Arc::clone(&network_state);
move || ProtocolHandle::Both(Box::new(Feeler::new(Arc::clone(&network_state))))
});
// TODO: how to deny banned node to open those protocols?
// Ping protocol
let ping_controller = if config.support_protocols.contains(&SupportProtocol::Ping) {
let ping_interval = Duration::from_secs(config.ping_interval_secs);
let ping_timeout = Duration::from_secs(config.ping_timeout_secs);

let ping_network_state = Arc::clone(&network_state);
let (ping_handler, ping_controller) =
PingHandler::new(ping_interval, ping_timeout, ping_network_state);
let ping_meta = SupportProtocols::Ping.build_meta_with_service_handle(move || {
ProtocolHandle::Callback(Box::new(ping_handler))
});
protocol_metas.push(ping_meta);
Some(ping_controller)
} else {
None
};

let disconnect_message_state = Arc::clone(&network_state);
let disconnect_message_meta = SupportProtocols::DisconnectMessage
.build_meta_with_service_handle(move || {
ProtocolHandle::Callback(Box::new(DisconnectMessageProtocol::new(
disconnect_message_state,
)))
// Discovery protocol
if config
.support_protocols
.contains(&SupportProtocol::Discovery)
{
let addr_mgr = DiscoveryAddressManager {
network_state: Arc::clone(&network_state),
discovery_local_address: config.discovery_local_address,
};
let disc_meta = SupportProtocols::Discovery.build_meta_with_service_handle(move || {
ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(addr_mgr)))
});
protocol_metas.push(disc_meta);
}

// == Build p2p service struct
let mut protocol_metas = protocols
.into_iter()
.map(CKBProtocol::build)
.collect::<Vec<_>>();
protocol_metas.push(feeler_meta);
protocol_metas.push(disconnect_message_meta);
protocol_metas.push(ping_meta);
protocol_metas.push(disc_meta);
protocol_metas.push(identify_meta);
// Feeler protocol
if config.support_protocols.contains(&SupportProtocol::Feeler) {
let feeler_meta = SupportProtocols::Feeler.build_meta_with_service_handle({
let network_state = Arc::clone(&network_state);
move || ProtocolHandle::Both(Box::new(Feeler::new(Arc::clone(&network_state))))
});
protocol_metas.push(feeler_meta);
}

// DisconnectMessage protocol
if config
.support_protocols
.contains(&SupportProtocol::DisconnectMessage)
{
let disconnect_message_state = Arc::clone(&network_state);
let disconnect_message_meta = SupportProtocols::DisconnectMessage
.build_meta_with_service_handle(move || {
ProtocolHandle::Callback(Box::new(DisconnectMessageProtocol::new(
disconnect_message_state,
)))
});
protocol_metas.push(disconnect_message_meta);
}

let mut service_builder = ServiceBuilder::default();
let mut yamux_config = YamuxConfig::default();
Expand Down Expand Up @@ -1142,7 +1160,7 @@ pub struct NetworkController {
version: String,
network_state: Arc<NetworkState>,
p2p_control: ServiceControl,
ping_controller: Sender<()>,
ping_controller: Option<Sender<()>>,
stop: StopHandler<()>,
}

Expand Down Expand Up @@ -1320,8 +1338,9 @@ impl NetworkController {

/// Try ping all connected peers
pub fn ping_peers(&self) {
let mut ping_controller = self.ping_controller.clone();
let _ignore = ping_controller.try_send(());
if let Some(mut ping_controller) = self.ping_controller.clone() {
let _ignore = ping_controller.try_send(());
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion util/app-config/src/configs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub use miner::{
ClientConfig as MinerClientConfig, Config as MinerConfig, DummyConfig, EaglesongSimpleConfig,
ExtraHashFunction, WorkerConfig as MinerWorkerConfig,
};
pub use network::{Config as NetworkConfig, HeaderMapConfig, SyncConfig};
pub use network::{Config as NetworkConfig, HeaderMapConfig, SupportProtocol, SyncConfig};
pub use network_alert::Config as NetworkAlertConfig;
pub use notify::Config as NotifyConfig;
pub use rpc::{Config as RpcConfig, Module as RpcModule};
Expand Down
31 changes: 31 additions & 0 deletions util/app-config/src/configs/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ pub struct Config {
/// `bootnodes`.
#[serde(default)]
pub bootnode_mode: bool,
/// Supported protocols list
#[serde(default = "default_support_all_protocols")]
pub support_protocols: Vec<SupportProtocol>,
/// Max send buffer size in bytes.
pub max_send_buffer: Option<usize>,
/// Network use reuse port or not
Expand Down Expand Up @@ -112,6 +115,34 @@ impl Default for HeaderMapConfig {
}
}

#[derive(Clone, Debug, Copy, Eq, PartialEq, Serialize, Deserialize)]
#[allow(missing_docs)]
pub enum SupportProtocol {
Ping,
Discovery,
Identify,
Feeler,
DisconnectMessage,
Sync,
Relay,
Time,
Alert,
}

fn default_support_all_protocols() -> Vec<SupportProtocol> {
vec![
SupportProtocol::Ping,
SupportProtocol::Discovery,
SupportProtocol::Identify,
SupportProtocol::Feeler,
SupportProtocol::DisconnectMessage,
SupportProtocol::Sync,
SupportProtocol::Relay,
SupportProtocol::Time,
SupportProtocol::Alert,
]
}

pub(crate) fn generate_random_key() -> [u8; 32] {
loop {
let mut key: [u8; 32] = [0; 32];
Expand Down
66 changes: 38 additions & 28 deletions util/launcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ extern crate num_cpus;

mod migrations;

use ckb_app_config::{BlockAssemblerConfig, ExitCode, RunArgs};
use ckb_app_config::{BlockAssemblerConfig, ExitCode, RunArgs, SupportProtocol};
use ckb_async_runtime::Handle;
use ckb_build_info::Version;
use ckb_chain::chain::{ChainController, ChainService};
Expand Down Expand Up @@ -300,15 +300,41 @@ impl Launcher {
NetworkState::from_config(self.args.config.network.clone())
.expect("Init network state failed"),
);

// Sync is a core protocol, user cannot disable it via config
let synchronizer = Synchronizer::new(chain_controller.clone(), Arc::clone(&sync_shared));
let mut protocols = vec![CKBProtocol::new_with_support_protocol(
SupportProtocols::Sync,
Box::new(synchronizer),
Arc::clone(&network_state),
)];

let support_protocols = &self.args.config.network.support_protocols;

if support_protocols.contains(&SupportProtocol::Relay) {
let relayer = Relayer::new(
chain_controller.clone(),
Arc::clone(&sync_shared),
self.args.config.tx_pool.min_fee_rate,
self.args.config.tx_pool.max_tx_verify_cycles,
);

protocols.push(CKBProtocol::new_with_support_protocol(
SupportProtocols::Relay,
Box::new(relayer),
Arc::clone(&network_state),
));
}

if support_protocols.contains(&SupportProtocol::Time) {
let net_timer = NetTimeProtocol::default();
protocols.push(CKBProtocol::new_with_support_protocol(
SupportProtocols::Time,
Box::new(net_timer),
Arc::clone(&network_state),
));
}

let relayer = Relayer::new(
chain_controller.clone(),
Arc::clone(&sync_shared),
self.args.config.tx_pool.min_fee_rate,
self.args.config.tx_pool.max_tx_verify_cycles,
);
let net_timer = NetTimeProtocol::default();
let alert_signature_config = self.args.config.alert_signature.clone().unwrap_or_default();
let alert_relayer = AlertRelayer::new(
self.version.to_string(),
Expand All @@ -318,29 +344,13 @@ impl Launcher {

let alert_notifier = Arc::clone(alert_relayer.notifier());
let alert_verifier = Arc::clone(alert_relayer.verifier());

let protocols = vec![
CKBProtocol::new_with_support_protocol(
SupportProtocols::Sync,
Box::new(synchronizer),
Arc::clone(&network_state),
),
CKBProtocol::new_with_support_protocol(
SupportProtocols::Relay,
Box::new(relayer),
Arc::clone(&network_state),
),
CKBProtocol::new_with_support_protocol(
SupportProtocols::Time,
Box::new(net_timer),
Arc::clone(&network_state),
),
CKBProtocol::new_with_support_protocol(
if support_protocols.contains(&SupportProtocol::Alert) {
protocols.push(CKBProtocol::new_with_support_protocol(
SupportProtocols::Alert,
Box::new(alert_relayer),
Arc::clone(&network_state),
),
];
));
}

let required_protocol_ids = vec![SupportProtocols::Sync.protocol_id()];

Expand Down

0 comments on commit 776d300

Please sign in to comment.