Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: panic when no enough block filter hashes #155

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions src/protocols/filter/components/block_filters_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_types::core::BlockNumber;
use ckb_types::utilities::calc_filter_hash;
use ckb_types::{packed, prelude::*};
use log::{info, trace, warn};
use log::{debug, info, trace, warn};
use rand::seq::SliceRandom;
use std::{cmp, sync::Arc};

Expand Down Expand Up @@ -56,8 +56,16 @@ impl<'a> BlockFiltersProcess<'a> {

let block_filters = self.message.to_entity();
let start_number: BlockNumber = block_filters.start_number().unpack();
let filters_count = block_filters.filters().len();
let blocks_count = block_filters.block_hashes().len();

debug!(
"recieved block filters: start number: {start_number}, \
filters count: {filters_count}, blocks count: {blocks_count}."
);

let min_filtered_block_number = self.filter.storage.get_min_filtered_block_number();
debug!("current min filtered block number: {min_filtered_block_number}");
if min_filtered_block_number + 1 != start_number {
info!(
"ignoring, the start_number of block_filters message {} is not continuous with min_filtered_block_number: {}",
Expand All @@ -73,9 +81,6 @@ impl<'a> BlockFiltersProcess<'a> {
return Status::ok();
}

let filters_count = block_filters.filters().len();
let blocks_count = block_filters.block_hashes().len();

if filters_count != blocks_count {
let error_message = format!(
"filters length ({}) not equal to block_hashes length ({})",
Expand Down Expand Up @@ -156,6 +161,17 @@ impl<'a> BlockFiltersProcess<'a> {
(finalized_check_point_hash, latest_block_filter_hashes)
} else {
let start_index = (start_number - finalized_check_point_number) as usize - 2;
if start_index >= latest_block_filter_hashes.len() {
info!(
"ignoring, no enough data for block filter hashes, \
finalized number: {finalized_check_point_number}, \
finalized index: {finalized_check_point_index}, \
start number: {start_number}, start index: {start_index}, \
filter hashes length {}",
latest_block_filter_hashes.len()
);
return Status::ok();
}
let parent_hash = latest_block_filter_hashes[start_index].clone();
latest_block_filter_hashes.drain(..=start_index);
(parent_hash, latest_block_filter_hashes)
Expand Down
19 changes: 19 additions & 0 deletions src/protocols/light_client/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ pub struct Peers {
// - Include at the next cached check point.
cached_block_filter_hashes: RwLock<(u32, Vec<packed::Byte32>)>,

#[cfg(not(test))]
max_outbound_peers: u32,

#[cfg(test)]
max_outbound_peers: RwLock<u32>,

check_point_interval: BlockNumber,
start_check_point: (u32, packed::Byte32),
}
Expand Down Expand Up @@ -1105,6 +1110,9 @@ impl Peers {
check_point_interval: BlockNumber,
start_check_point: (u32, packed::Byte32),
) -> Self {
#[cfg(test)]
let max_outbound_peers = RwLock::new(max_outbound_peers);

Self {
inner: Default::default(),
last_headers: Default::default(),
Expand Down Expand Up @@ -1231,10 +1239,21 @@ impl Peers {
&self.matched_blocks
}

#[cfg(not(test))]
pub(crate) fn get_max_outbound_peers(&self) -> u32 {
self.max_outbound_peers
}

#[cfg(test)]
pub(crate) fn get_max_outbound_peers(&self) -> u32 {
*self.max_outbound_peers.read().expect("poisoned")
}

#[cfg(test)]
pub(crate) fn set_max_outbound_peers(&self, max_outbound_peers: u32) {
*self.max_outbound_peers.write().expect("poisoned") = max_outbound_peers;
}

pub(crate) fn add_peer(&self, index: PeerIndex) {
let peer = Peer::new(self.check_point_interval, self.start_check_point.clone());
self.inner.insert(index, peer);
Expand Down
93 changes: 93 additions & 0 deletions src/tests/protocols/block_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,3 +726,96 @@ async fn test_block_filter_notify_recover_matched_blocks() {
]
);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_block_filter_without_enough_hashes() {
setup();

let chain = MockChain::new_with_dummy_pow("test-block-filter").start();
let nc = MockNetworkContext::new(SupportProtocols::Filter);

let min_filtered_block_number = 30;
let start_number = min_filtered_block_number + 1;
let proved_number = start_number + 5;
let script = Script::new_builder()
.code_hash(H256(rand::random()).pack())
.build();
chain.client_storage().update_filter_scripts(
vec![ScriptStatus {
script: script.clone(),
script_type: ScriptType::Lock,
block_number: 0,
}],
SetScriptsCommand::All,
);
chain
.client_storage()
.update_min_filtered_block_number(min_filtered_block_number);

chain.mine_to(start_number - 3);

{
let tx = {
let tx = chain.get_cellbase_as_input(start_number - 5);
let output = tx.output(0).unwrap().as_builder().lock(script).build();
tx.as_advanced_builder().set_outputs(vec![output]).build()
};
chain.mine_block(|block| {
let ids = vec![tx.proposal_short_id()];
block.as_advanced_builder().proposals(ids).build()
});
chain.mine_blocks(1);
chain.mine_block(|block| block.as_advanced_builder().transaction(tx.clone()).build());
chain.mine_blocks(1);
}

chain.mine_to(proved_number);

let snapshot = chain.shared().snapshot();

let tip_header: VerifiableHeader = snapshot
.get_verifiable_header_by_number(proved_number)
.expect("block stored")
.into();
chain
.client_storage()
.update_last_state(&U256::one(), &tip_header.header().data(), &[]);

let peer_index = PeerIndex::new(3);
let peers = {
let peers = chain.create_peers();
peers.add_peer(peer_index);
peers.mock_prove_state(peer_index, tip_header).unwrap();
peers.set_max_outbound_peers(3);
peers
};

let filter_data_1 = snapshot.get_block_filter_data(start_number).unwrap();
let filter_data_2 = snapshot.get_block_filter_data(start_number + 1).unwrap();
let block_hash_1 = snapshot.get_block_hash(start_number).unwrap();
let block_hash_2 = snapshot.get_block_hash(start_number + 1).unwrap();
let filter_hashes = {
let mut filter_hashes = snapshot
.get_block_filter_hashes_until(start_number + 3)
.unwrap();
filter_hashes.remove(0);
filter_hashes
};

let content = packed::BlockFilters::new_builder()
.start_number(start_number.pack())
.block_hashes(vec![block_hash_1.clone(), block_hash_2].pack())
.filters(vec![filter_data_1, filter_data_2].pack())
.build();
let message = packed::BlockFilterMessage::new_builder()
.set(content)
.build()
.as_bytes();

let mut protocol = chain.create_filter_protocol(Arc::clone(&peers));
peers.mock_latest_block_filter_hashes(peer_index, 0, filter_hashes);
protocol.received(nc.context(), peer_index, message).await;
assert!(nc.not_banned(peer_index));

assert!(nc.sent_messages().borrow().is_empty());
}
Loading