Skip to content

Commit

Permalink
chore: upgrade p2p
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Apr 30, 2020
1 parent c4c4157 commit 9c8085e
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 28 deletions.
30 changes: 15 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 14 additions & 2 deletions ckb-bin/src/subcommand/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use ckb_chain::chain::ChainService;
use ckb_jsonrpc_types::ScriptHashType;
use ckb_logger::info_target;
use ckb_network::{
CKBProtocol, NetworkService, NetworkState, MAX_FRAME_LENGTH_ALERT, MAX_FRAME_LENGTH_RELAY,
MAX_FRAME_LENGTH_SYNC, MAX_FRAME_LENGTH_TIME,
BlockingFlag, CKBProtocol, NetworkService, NetworkState, MAX_FRAME_LENGTH_ALERT,
MAX_FRAME_LENGTH_RELAY, MAX_FRAME_LENGTH_SYNC, MAX_FRAME_LENGTH_TIME,
};
use ckb_network_alert::alert_relayer::AlertRelayer;
use ckb_resource::Resource;
Expand Down Expand Up @@ -82,6 +82,14 @@ pub fn run(args: RunArgs, version: Version) -> Result<(), ExitCode> {
let alert_notifier = Arc::clone(alert_relayer.notifier());
let alert_verifier = Arc::clone(alert_relayer.verifier());

let mut no_blocking_flag = BlockingFlag::default();
no_blocking_flag.disable_all();

let mut blocking_recv_flag = BlockingFlag::default();
blocking_recv_flag.disable_connected();
blocking_recv_flag.disable_disconnected();
blocking_recv_flag.disable_notify();

let synchronizer_clone = synchronizer.clone();
let protocols = vec![
CKBProtocol::new(
Expand All @@ -91,6 +99,7 @@ pub fn run(args: RunArgs, version: Version) -> Result<(), ExitCode> {
MAX_FRAME_LENGTH_SYNC,
move || Box::new(synchronizer_clone.clone()),
Arc::clone(&network_state),
blocking_recv_flag,
),
CKBProtocol::new(
"rel".to_string(),
Expand All @@ -99,6 +108,7 @@ pub fn run(args: RunArgs, version: Version) -> Result<(), ExitCode> {
MAX_FRAME_LENGTH_RELAY,
move || Box::new(relayer.clone()),
Arc::clone(&network_state),
blocking_recv_flag,
),
CKBProtocol::new(
"tim".to_string(),
Expand All @@ -107,6 +117,7 @@ pub fn run(args: RunArgs, version: Version) -> Result<(), ExitCode> {
MAX_FRAME_LENGTH_TIME,
move || Box::new(net_timer.clone()),
Arc::clone(&network_state),
no_blocking_flag,
),
CKBProtocol::new(
"alt".to_string(),
Expand All @@ -115,6 +126,7 @@ pub fn run(args: RunArgs, version: Version) -> Result<(), ExitCode> {
MAX_FRAME_LENGTH_ALERT,
move || Box::new(alert_relayer.clone()),
Arc::clone(&network_state),
no_blocking_flag,
),
];

Expand Down
4 changes: 2 additions & 2 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ ckb-util = { path = "../util" }
ckb-stop-handler = { path = "../util/stop-handler" }
ckb-logger = { path = "../util/logger" }
tokio = { version = "0.2.11", features = ["time", "io-util", "tcp", "dns", "rt-threaded", "blocking", "stream"] }
tokio-util = { version = "0.2.0", features = ["codec"] }
tokio-util = { version = "0.3.0", features = ["codec"] }
futures = "0.3"
crossbeam-channel = "0.3"
p2p = { version="0.3.0-alpha.3", package="tentacle", features = ["molc"] }
p2p = { version="0.3.0-alpha.4", package="tentacle", features = ["molc"] }
faketime = "0.2.0"
lazy_static = "1.3.0"
bs58 = "0.3.0"
Expand Down
3 changes: 2 additions & 1 deletion network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ pub use crate::{
pub use p2p::{
bytes, multiaddr,
secio::{PeerId, PublicKey},
service::{ServiceControl, SessionType, TargetSession},
service::{BlockingFlag, ServiceControl, SessionType, TargetSession},
traits::ServiceProtocol,
ProtocolId,
};
pub use tokio;

// Max message frame length for sync protocol: 2MB
// NOTE: update this value when block size limit changed
Expand Down
12 changes: 10 additions & 2 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use p2p::{
multiaddr::{self, Multiaddr},
secio::{self, PeerId},
service::{
ProtocolEvent, ProtocolHandle, Service, ServiceError, ServiceEvent, TargetProtocol,
TargetSession,
BlockingFlag, ProtocolEvent, ProtocolHandle, Service, ServiceError, ServiceEvent,
TargetProtocol, TargetSession,
},
traits::ServiceHandle,
utils::extract_peer_id,
Expand Down Expand Up @@ -848,6 +848,9 @@ impl NetworkService {
) -> NetworkService {
let config = &network_state.config;

let mut no_blocking_flag = BlockingFlag::default();
no_blocking_flag.disable_all();

// == Build special protocols

// TODO: how to deny banned node to open those protocols?
Expand All @@ -873,6 +876,7 @@ impl NetworkService {
ping_sender,
)))
})
.flag(no_blocking_flag)
.build();

// Discovery protocol
Expand All @@ -893,6 +897,7 @@ impl NetworkService {
config.discovery_local_address,
)))
})
.flag(no_blocking_flag)
.build();

// Identify protocol
Expand All @@ -911,6 +916,7 @@ impl NetworkService {
.service_handle(move || {
ProtocolHandle::Both(Box::new(IdentifyProtocol::new(identify_callback)))
})
.flag(no_blocking_flag)
.build();

// Feeler protocol
Expand All @@ -929,6 +935,7 @@ impl NetworkService {
let network_state = Arc::clone(&network_state);
move || ProtocolHandle::Both(Box::new(Feeler::new(Arc::clone(&network_state))))
})
.flag(no_blocking_flag)
.build();

let disconnect_message_meta = MetaBuilder::default()
Expand All @@ -942,6 +949,7 @@ impl NetworkService {
)
})
.service_handle(move || ProtocolHandle::Both(Box::new(DisconnectMessageProtocol)))
.flag(no_blocking_flag)
.build();

// == Build p2p service struct
Expand Down
5 changes: 2 additions & 3 deletions network/src/protocols/discovery/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ impl Decoder for DiscoveryCodec {
}
}

impl Encoder for DiscoveryCodec {
type Item = DiscoveryMessage;
impl Encoder<DiscoveryMessage> for DiscoveryCodec {
type Error = io::Error;

fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: DiscoveryMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
self.inner.encode(item.encode(), dst)
}
}
Expand Down
7 changes: 6 additions & 1 deletion network/src/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use p2p::{
builder::MetaBuilder,
bytes::Bytes,
context::{ProtocolContext, ProtocolContextMutRef},
service::{ProtocolHandle, ProtocolMeta, ServiceControl, TargetSession},
service::{BlockingFlag, ProtocolHandle, ProtocolMeta, ServiceControl, TargetSession},
traits::ServiceProtocol,
ProtocolId, SessionId,
};
Expand Down Expand Up @@ -106,6 +106,7 @@ pub struct CKBProtocol {
max_frame_length: usize,
handler: Box<dyn Fn() -> Box<dyn CKBProtocolHandler + Send + 'static> + Send + 'static>,
network_state: Arc<NetworkState>,
flag: BlockingFlag,
}

impl CKBProtocol {
Expand All @@ -116,6 +117,7 @@ impl CKBProtocol {
max_frame_length: usize,
handler: F,
network_state: Arc<NetworkState>,
flag: BlockingFlag,
) -> Self {
CKBProtocol {
id,
Expand All @@ -128,6 +130,7 @@ impl CKBProtocol {
versions.sort_by(|a, b| b.cmp(a));
versions.to_vec()
},
flag,
}
}

Expand All @@ -146,6 +149,7 @@ impl CKBProtocol {
pub fn build(self) -> ProtocolMeta {
let protocol_name = self.protocol_name();
let max_frame_length = self.max_frame_length;
let flag = self.flag;
let supported_versions = self
.supported_versions
.iter()
Expand All @@ -171,6 +175,7 @@ impl CKBProtocol {
})
.before_send(compress)
.before_receive(|| Some(Box::new(decompress)))
.flag(flag)
.build()
}
}
Expand Down
8 changes: 6 additions & 2 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use crate::types::{ActiveChain, SyncShared};
use crate::{Status, StatusCode, BAD_MESSAGE_BAN_TIME};
use ckb_chain::chain::ChainController;
use ckb_logger::{debug_target, error_target, info_target, metric, trace_target, warn_target};
use ckb_network::{bytes::Bytes, CKBProtocolContext, CKBProtocolHandler, PeerIndex, TargetSession};
use ckb_network::{
bytes::Bytes, tokio, CKBProtocolContext, CKBProtocolHandler, PeerIndex, TargetSession,
};
use ckb_tx_pool::FeeRate;
use ckb_types::core::BlockView;
use ckb_types::{
Expand Down Expand Up @@ -704,7 +706,9 @@ impl CKBProtocolHandler for Relayer {
let start_time = Instant::now();
trace_target!(crate::LOG_TARGET_RELAY, "start notify token={}", token);
match token {
TX_PROPOSAL_TOKEN => self.prune_tx_proposal_request(nc.as_ref()),
TX_PROPOSAL_TOKEN => {
tokio::task::block_in_place(|| self.prune_tx_proposal_request(nc.as_ref()))
}
ASK_FOR_TXS_TOKEN => self.ask_for_txs(nc.as_ref()),
TX_HASHES_TOKEN => self.send_bulk_of_tx_hashes(nc.as_ref()),
_ => unreachable!(),
Expand Down
1 change: 1 addition & 0 deletions test/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ impl Net {
1024 * 1024,
move || Box::new(DummyProtocolHandler { tx: tx.clone() }),
Arc::clone(&network_state),
Default::default(),
)
})
.collect();
Expand Down

0 comments on commit 9c8085e

Please sign in to comment.