Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: upgrade p2p/tokio #2050

Merged
merged 1 commit into from
May 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 14 additions & 2 deletions ckb-bin/src/subcommand/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use ckb_chain::chain::ChainService;
use ckb_jsonrpc_types::ScriptHashType;
use ckb_logger::info_target;
use ckb_network::{
CKBProtocol, NetworkService, NetworkState, MAX_FRAME_LENGTH_ALERT, MAX_FRAME_LENGTH_RELAY,
MAX_FRAME_LENGTH_SYNC, MAX_FRAME_LENGTH_TIME,
BlockingFlag, CKBProtocol, NetworkService, NetworkState, MAX_FRAME_LENGTH_ALERT,
MAX_FRAME_LENGTH_RELAY, MAX_FRAME_LENGTH_SYNC, MAX_FRAME_LENGTH_TIME,
};
use ckb_network_alert::alert_relayer::AlertRelayer;
use ckb_resource::Resource;
Expand Down Expand Up @@ -82,6 +82,14 @@ pub fn run(args: RunArgs, version: Version) -> Result<(), ExitCode> {
let alert_notifier = Arc::clone(alert_relayer.notifier());
let alert_verifier = Arc::clone(alert_relayer.verifier());

let mut no_blocking_flag = BlockingFlag::default();
no_blocking_flag.disable_all();

let mut blocking_recv_flag = BlockingFlag::default();
blocking_recv_flag.disable_connected();
blocking_recv_flag.disable_disconnected();
blocking_recv_flag.disable_notify();

let synchronizer_clone = synchronizer.clone();
let protocols = vec![
CKBProtocol::new(
Expand All @@ -91,6 +99,7 @@ pub fn run(args: RunArgs, version: Version) -> Result<(), ExitCode> {
MAX_FRAME_LENGTH_SYNC,
move || Box::new(synchronizer_clone.clone()),
Arc::clone(&network_state),
blocking_recv_flag,
),
CKBProtocol::new(
"rel".to_string(),
Expand All @@ -99,6 +108,7 @@ pub fn run(args: RunArgs, version: Version) -> Result<(), ExitCode> {
MAX_FRAME_LENGTH_RELAY,
move || Box::new(relayer.clone()),
Arc::clone(&network_state),
blocking_recv_flag,
),
CKBProtocol::new(
"tim".to_string(),
Expand All @@ -107,6 +117,7 @@ pub fn run(args: RunArgs, version: Version) -> Result<(), ExitCode> {
MAX_FRAME_LENGTH_TIME,
move || Box::new(net_timer.clone()),
Arc::clone(&network_state),
no_blocking_flag,
),
CKBProtocol::new(
"alt".to_string(),
Expand All @@ -115,6 +126,7 @@ pub fn run(args: RunArgs, version: Version) -> Result<(), ExitCode> {
MAX_FRAME_LENGTH_ALERT,
move || Box::new(alert_relayer.clone()),
Arc::clone(&network_state),
no_blocking_flag,
),
];

Expand Down
4 changes: 2 additions & 2 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ ckb-util = { path = "../util" }
ckb-stop-handler = { path = "../util/stop-handler" }
ckb-logger = { path = "../util/logger" }
tokio = { version = "0.2.11", features = ["time", "io-util", "tcp", "dns", "rt-threaded", "blocking", "stream"] }
tokio-util = { version = "0.2.0", features = ["codec"] }
tokio-util = { version = "0.3.0", features = ["codec"] }
futures = "0.3"
crossbeam-channel = "0.3"
p2p = { version="0.3.0-alpha.3", package="tentacle", features = ["molc"] }
p2p = { version="0.3.0-alpha.4", package="tentacle", features = ["molc"] }
faketime = "0.2.0"
lazy_static = "1.3.0"
bs58 = "0.3.0"
Expand Down
3 changes: 2 additions & 1 deletion network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ pub use crate::{
pub use p2p::{
bytes, multiaddr,
secio::{PeerId, PublicKey},
service::{ServiceControl, SessionType, TargetSession},
service::{BlockingFlag, ServiceControl, SessionType, TargetSession},
traits::ServiceProtocol,
ProtocolId,
};
pub use tokio;

// Max message frame length for sync protocol: 2MB
// NOTE: update this value when block size limit changed
Expand Down
12 changes: 10 additions & 2 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use p2p::{
multiaddr::{self, Multiaddr},
secio::{self, PeerId},
service::{
ProtocolEvent, ProtocolHandle, Service, ServiceError, ServiceEvent, TargetProtocol,
TargetSession,
BlockingFlag, ProtocolEvent, ProtocolHandle, Service, ServiceError, ServiceEvent,
TargetProtocol, TargetSession,
},
traits::ServiceHandle,
utils::extract_peer_id,
Expand Down Expand Up @@ -848,6 +848,9 @@ impl NetworkService {
) -> NetworkService {
let config = &network_state.config;

let mut no_blocking_flag = BlockingFlag::default();
no_blocking_flag.disable_all();

// == Build special protocols

// TODO: how to deny banned node to open those protocols?
Expand All @@ -873,6 +876,7 @@ impl NetworkService {
ping_sender,
)))
})
.flag(no_blocking_flag)
.build();

// Discovery protocol
Expand All @@ -893,6 +897,7 @@ impl NetworkService {
config.discovery_local_address,
)))
})
.flag(no_blocking_flag)
.build();

// Identify protocol
Expand All @@ -911,6 +916,7 @@ impl NetworkService {
.service_handle(move || {
ProtocolHandle::Both(Box::new(IdentifyProtocol::new(identify_callback)))
})
.flag(no_blocking_flag)
.build();

// Feeler protocol
Expand All @@ -929,6 +935,7 @@ impl NetworkService {
let network_state = Arc::clone(&network_state);
move || ProtocolHandle::Both(Box::new(Feeler::new(Arc::clone(&network_state))))
})
.flag(no_blocking_flag)
.build();

let disconnect_message_meta = MetaBuilder::default()
Expand All @@ -942,6 +949,7 @@ impl NetworkService {
)
})
.service_handle(move || ProtocolHandle::Both(Box::new(DisconnectMessageProtocol)))
.flag(no_blocking_flag)
.build();

// == Build p2p service struct
Expand Down
5 changes: 2 additions & 3 deletions network/src/protocols/discovery/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ impl Decoder for DiscoveryCodec {
}
}

impl Encoder for DiscoveryCodec {
type Item = DiscoveryMessage;
impl Encoder<DiscoveryMessage> for DiscoveryCodec {
type Error = io::Error;

fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: DiscoveryMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
self.inner.encode(item.encode(), dst)
}
}
Expand Down
7 changes: 6 additions & 1 deletion network/src/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use p2p::{
builder::MetaBuilder,
bytes::Bytes,
context::{ProtocolContext, ProtocolContextMutRef},
service::{ProtocolHandle, ProtocolMeta, ServiceControl, TargetSession},
service::{BlockingFlag, ProtocolHandle, ProtocolMeta, ServiceControl, TargetSession},
traits::ServiceProtocol,
ProtocolId, SessionId,
};
Expand Down Expand Up @@ -106,6 +106,7 @@ pub struct CKBProtocol {
max_frame_length: usize,
handler: Box<dyn Fn() -> Box<dyn CKBProtocolHandler + Send + 'static> + Send + 'static>,
network_state: Arc<NetworkState>,
flag: BlockingFlag,
}

impl CKBProtocol {
Expand All @@ -116,6 +117,7 @@ impl CKBProtocol {
max_frame_length: usize,
handler: F,
network_state: Arc<NetworkState>,
flag: BlockingFlag,
) -> Self {
CKBProtocol {
id,
Expand All @@ -128,6 +130,7 @@ impl CKBProtocol {
versions.sort_by(|a, b| b.cmp(a));
versions.to_vec()
},
flag,
}
}

Expand All @@ -146,6 +149,7 @@ impl CKBProtocol {
pub fn build(self) -> ProtocolMeta {
let protocol_name = self.protocol_name();
let max_frame_length = self.max_frame_length;
let flag = self.flag;
let supported_versions = self
.supported_versions
.iter()
Expand All @@ -171,6 +175,7 @@ impl CKBProtocol {
})
.before_send(compress)
.before_receive(|| Some(Box::new(decompress)))
.flag(flag)
.build()
}
}
Expand Down
8 changes: 6 additions & 2 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use crate::types::{ActiveChain, SyncShared};
use crate::{Status, StatusCode, BAD_MESSAGE_BAN_TIME};
use ckb_chain::chain::ChainController;
use ckb_logger::{debug_target, error_target, info_target, metric, trace_target, warn_target};
use ckb_network::{bytes::Bytes, CKBProtocolContext, CKBProtocolHandler, PeerIndex, TargetSession};
use ckb_network::{
bytes::Bytes, tokio, CKBProtocolContext, CKBProtocolHandler, PeerIndex, TargetSession,
};
use ckb_tx_pool::FeeRate;
use ckb_types::core::BlockView;
use ckb_types::{
Expand Down Expand Up @@ -704,7 +706,9 @@ impl CKBProtocolHandler for Relayer {
let start_time = Instant::now();
trace_target!(crate::LOG_TARGET_RELAY, "start notify token={}", token);
match token {
TX_PROPOSAL_TOKEN => self.prune_tx_proposal_request(nc.as_ref()),
TX_PROPOSAL_TOKEN => {
tokio::task::block_in_place(|| self.prune_tx_proposal_request(nc.as_ref()))
}
ASK_FOR_TXS_TOKEN => self.ask_for_txs(nc.as_ref()),
TX_HASHES_TOKEN => self.send_bulk_of_tx_hashes(nc.as_ref()),
_ => unreachable!(),
Expand Down
1 change: 1 addition & 0 deletions test/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ impl Net {
1024 * 1024,
move || Box::new(DummyProtocolHandler { tx: tx.clone() }),
Arc::clone(&network_state),
Default::default(),
)
})
.collect();
Expand Down