Skip to content

Commit

Permalink
refactor: increase ban tolerance for unexpected responses
Browse files Browse the repository at this point in the history
  • Loading branch information
yangby-cryptape committed Jun 28, 2024
1 parent 0379fe2 commit 6736d0d
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 20 deletions.
78 changes: 78 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jsonrpc-core = "18.0"
jsonrpc-derive = "18.0"
jsonrpc-http-server = "18.0"
jsonrpc-server-utils = "18.0"
governor = "0.6.3"

[dev-dependencies]
ckb-shared = "0.113.0"
Expand Down
2 changes: 1 addition & 1 deletion src/protocols/filter/block_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ impl CKBProtocolHandler for FilterProtocol {

let item_name = msg.item_name();
let status = self.try_process(Arc::clone(&nc), peer, msg);
status.process(nc, peer, "BlockFilter", item_name);
status.process(nc, &self.peers, peer, "BlockFilter", item_name);
}

async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
Expand Down
2 changes: 1 addition & 1 deletion src/protocols/light_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl CKBProtocolHandler for LightClientProtocol {

let item_name = msg.item_name();
let status = self.try_process(nc.as_ref(), peer_index, msg);
status.process(nc, peer_index, "LightClient", item_name);
status.process(nc, self.peers(), peer_index, "LightClient", item_name);
}

async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
Expand Down
49 changes: 43 additions & 6 deletions src/protocols/light_client/peers.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
use std::{
collections::{HashMap, HashSet},
fmt, mem,
num::NonZeroU32,
sync::{Mutex, RwLock},
};

use ckb_network::PeerIndex;
use ckb_systemtime::unix_time_as_millis;
use ckb_types::{
Expand All @@ -9,14 +16,12 @@ use ckb_types::{
H256, U256,
};
use dashmap::DashMap;
use std::{
collections::{HashMap, HashSet},
fmt, mem,
sync::RwLock,
};
use governor::{clock::DefaultClock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter};

use super::prelude::*;
use crate::protocols::{Status, StatusCode, MESSAGE_TIMEOUT};
use crate::protocols::{Status, StatusCode, BAD_MESSAGE_ALLOWED_EACH_HOUR, MESSAGE_TIMEOUT};

pub type BadMessageRateLimiter<T> = RateLimiter<T, DefaultKeyedStateStore<T>, DefaultClock>;

pub struct Peers {
inner: DashMap<PeerIndex, Peer>,
Expand Down Expand Up @@ -45,6 +50,10 @@ pub struct Peers {

check_point_interval: BlockNumber,
start_check_point: (u32, packed::Byte32),

rate_limiter: Mutex<BadMessageRateLimiter<PeerIndex>>,
#[cfg(test)]
bad_message_allowed_each_hour: u32,
}

#[derive(Clone)]
Expand Down Expand Up @@ -1117,10 +1126,22 @@ impl Peers {
max_outbound_peers: u32,
check_point_interval: BlockNumber,
start_check_point: (u32, packed::Byte32),
bad_message_allowed_each_hour: u32,
) -> Self {
#[cfg(test)]
let max_outbound_peers = RwLock::new(max_outbound_peers);

let rate_limiter = {
let limit = if bad_message_allowed_each_hour == 0 {
BAD_MESSAGE_ALLOWED_EACH_HOUR
} else {
bad_message_allowed_each_hour
};
let max_burst = unsafe { NonZeroU32::new_unchecked(limit) };
let quota = Quota::per_hour(max_burst);
Mutex::new(RateLimiter::keyed(quota))
};

Self {
inner: Default::default(),
fetching_headers: DashMap::new(),
Expand All @@ -1130,6 +1151,9 @@ impl Peers {
max_outbound_peers,
check_point_interval,
start_check_point,
rate_limiter,
#[cfg(test)]
bad_message_allowed_each_hour,
}
}

Expand Down Expand Up @@ -1272,6 +1296,7 @@ impl Peers {
self.mark_fetching_headers_timeout(index);
self.mark_fetching_txs_timeout(index);
self.inner.remove(&index);
let _ignore_error = self.rate_limiter.lock().map(|inner| inner.retain_recent());
}

pub(crate) fn get_peers_index(&self) -> Vec<PeerIndex> {
Expand Down Expand Up @@ -1965,6 +1990,18 @@ impl Peers {
.map(|(peer_index, _)| peer_index)
.collect()
}

pub(crate) fn should_ban(&self, peer_index: PeerIndex) -> bool {
#[cfg(test)]
if self.bad_message_allowed_each_hour == 0 {
return true;
}
self.rate_limiter
.lock()
.map_err(|_| ())
.and_then(|inner| inner.check_key(&peer_index).map_err(|_| ()))
.is_err()
}
}

fn if_verifiable_headers_are_same(lhs: &VerifiableHeader, rhs: &VerifiableHeader) -> bool {
Expand Down
3 changes: 3 additions & 0 deletions src/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ pub(crate) use relayer::{PendingTxs, RelayProtocol};
pub(crate) use status::{Status, StatusCode};
pub(crate) use synchronizer::SyncProtocol;

// The period to ban a peer for bad messages.
pub const BAD_MESSAGE_BAN_TIME: Duration = Duration::from_secs(5 * 60);
// Ban a peer if unexpected responses from that peer reach the limit.
pub const BAD_MESSAGE_ALLOWED_EACH_HOUR: u32 = 10;
// Ban a peer if it reach any timeout.
pub const MESSAGE_TIMEOUT: u64 = 60 * 1000;

Expand Down
14 changes: 10 additions & 4 deletions src/protocols/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{fmt, sync::Arc, time::Duration};
use ckb_network::{CKBProtocolContext, PeerIndex};
use log::{debug, error, trace, warn};

use super::BAD_MESSAGE_BAN_TIME;
use super::{Peers, BAD_MESSAGE_BAN_TIME};

/// StatusCodes indicate whether a specific operation has been successfully completed.
///
Expand Down Expand Up @@ -157,10 +157,15 @@ impl Status {
}

/// Whether the session should be banned.
pub fn should_ban(&self) -> Option<Duration> {
pub fn should_ban(&self, peers: &Peers, index: PeerIndex) -> Option<Duration> {
let code = self.code() as u16;
if (400..500).contains(&code) {
Some(BAD_MESSAGE_BAN_TIME)
// TODO Resort the error codes, let malformed messages, which lead to be banned directly, to be together.
if code == 400 || peers.should_ban(index) {
Some(BAD_MESSAGE_BAN_TIME)
} else {
None
}
} else {
None
}
Expand All @@ -180,11 +185,12 @@ impl Status {
pub fn process(
&self,
nc: Arc<dyn CKBProtocolContext + Sync>,
peers: &Peers,
index: PeerIndex,
protocol: &str,
message: &str,
) {
if let Some(ban_time) = self.should_ban() {
if let Some(ban_time) = self.should_ban(peers, index) {
error!(
"{}Protocol.received {} from {}, result {}, ban {:?}",
protocol, message, index, self, ban_time
Expand Down
3 changes: 2 additions & 1 deletion src/subcmds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
error::{Error, Result},
protocols::{
FilterProtocol, LightClientProtocol, Peers, PendingTxs, RelayProtocol, SyncProtocol,
CHECK_POINT_INTERVAL,
BAD_MESSAGE_ALLOWED_EACH_HOUR, CHECK_POINT_INTERVAL,
},
service::Service,
storage::Storage,
Expand Down Expand Up @@ -65,6 +65,7 @@ impl RunConfig {
max_outbound_peers,
CHECK_POINT_INTERVAL,
storage.get_last_check_point(),
BAD_MESSAGE_ALLOWED_EACH_HOUR,
));
let sync_protocol = SyncProtocol::new(storage.clone(), Arc::clone(&peers));
let relay_protocol_v2 = RelayProtocol::new(
Expand Down
6 changes: 6 additions & 0 deletions src/tests/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,17 @@ pub(crate) trait ChainExt {
fn consensus(&self) -> &Consensus;

fn create_peers(&self) -> Arc<Peers> {
let bad_message_allowed_each_hour = 0;
self.create_peers_with_parameters(bad_message_allowed_each_hour)
}

fn create_peers_with_parameters(&self, bad_message_allowed_each_hour: u32) -> Arc<Peers> {
let max_outbound_peers = 1;
let peers = Peers::new(
max_outbound_peers,
CHECK_POINT_INTERVAL,
self.client_storage().get_last_check_point(),
bad_message_allowed_each_hour,
);
Arc::new(peers)
}
Expand Down
7 changes: 2 additions & 5 deletions src/tests/protocols/block_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ async fn test_block_filter_ignore_start_number() {
.set(content)
.build();

let peer_index = PeerIndex::new(3);
protocol
.received(nc.context(), peer_index, message.as_bytes())
.await;
Expand Down Expand Up @@ -131,7 +130,6 @@ async fn test_block_filter_empty_filters() {
.set(content)
.build();

let peer_index = PeerIndex::new(3);
protocol
.received(nc.context(), peer_index, message.as_bytes())
.await;
Expand All @@ -144,6 +142,7 @@ async fn test_block_filter_empty_filters() {
async fn test_block_filter_invalid_filters_count() {
let chain = MockChain::new_with_dummy_pow("test-block-filter");
let nc = MockNetworkContext::new(SupportProtocols::Filter);
let bad_message_allowed_each_hour = 5;

let min_filtered_block_number = 3;
chain.client_storage().update_filter_scripts(
Expand All @@ -166,7 +165,7 @@ async fn test_block_filter_invalid_filters_count() {
None,
Default::default(),
);
let peers = chain.create_peers();
let peers = chain.create_peers_with_parameters(bad_message_allowed_each_hour);
peers.add_peer(peer_index);
peers.mock_prove_state(peer_index, tip_header).unwrap();
peers
Expand All @@ -181,11 +180,9 @@ async fn test_block_filter_invalid_filters_count() {
.set(content)
.build();

let peer_index = PeerIndex::new(3);
protocol
.received(nc.context(), peer_index, message.as_bytes())
.await;

assert_eq!(
nc.has_banned(peer_index).map(|(duration, _)| duration),
Some(BAD_MESSAGE_BAN_TIME)
Expand Down
Loading

0 comments on commit 6736d0d

Please sign in to comment.