Skip to content

Commit

Permalink
Merge pull request #2446 from maqi/improve_chunk_proof_verification
Browse files Browse the repository at this point in the history
chore!: improve ChunkProofVerification
  • Loading branch information
maqi authored Nov 26, 2024
2 parents 199dc78 + 8889ef3 commit 11b659b
Show file tree
Hide file tree
Showing 9 changed files with 427 additions and 248 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ fn check_and_wipe_storage_dir_if_necessary(
// * the storage_dir shall be wiped out
// * the version file shall be updated
if cur_version_str != prev_version_str {
warn!("Trying to wipe out storege dir {storage_dir_path:?}, as cur_version {cur_version_str:?} doesn't match prev_version {prev_version_str:?}");
warn!("Trying to wipe out storage dir {storage_dir_path:?}, as cur_version {cur_version_str:?} doesn't match prev_version {prev_version_str:?}");
let _ = fs::remove_dir_all(storage_dir_path);

let mut file = fs::OpenOptions::new()
Expand Down
14 changes: 0 additions & 14 deletions sn_networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,6 @@ pub enum NetworkEvent {
FailedToFetchHolders(BTreeSet<PeerId>),
/// Quotes to be verified
QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)> },
/// Carry out chunk proof check against the specified record and peer
ChunkProofVerification {
peer_id: PeerId,
key_to_verify: NetworkAddress,
},
}

/// Terminate node for the following reason
Expand Down Expand Up @@ -206,15 +201,6 @@ impl Debug for NetworkEvent {
quotes.len()
)
}
NetworkEvent::ChunkProofVerification {
peer_id,
key_to_verify: keys_to_verify,
} => {
write!(
f,
"NetworkEvent::ChunkProofVerification({peer_id:?} {keys_to_verify:?})"
)
}
}
}
}
Expand Down
112 changes: 4 additions & 108 deletions sn_networking/src/event/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{
cmd::NetworkSwarmCmd, log_markers::Marker, sort_peers_by_address, MsgResponder, NetworkError,
NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE,
cmd::NetworkSwarmCmd, log_markers::Marker, MsgResponder, NetworkError, NetworkEvent,
SwarmDriver,
};
use itertools::Itertools;
use libp2p::request_response::{self, Message};
use rand::{rngs::OsRng, thread_rng, Rng};
use sn_protocol::{
messages::{CmdResponse, Request, Response},
storage::RecordType,
Expand Down Expand Up @@ -207,14 +205,10 @@ impl SwarmDriver {
return;
}

let more_than_one_key = incoming_keys.len() > 1;

// On receive a replication_list from a close_group peer, we undertake two tasks:
// On receive a replication_list from a close_group peer, we undertake:
// 1, For those keys that we don't have:
// fetch them if close enough to us
// 2, For those keys that we have and supposed to be held by the sender as well:
// start chunk_proof check against a randomly selected chunk type record to the sender
// 3, For those spends that we have that differ in the hash, we fetch the other version
// 2, For those spends that we have that differ in the hash, we fetch the other version
// and update our local copy.
let all_keys = self
.swarm
Expand All @@ -230,103 +224,5 @@ impl SwarmDriver {
} else {
self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch));
}

// Only trigger chunk_proof check based every X% of the time
let mut rng = thread_rng();
// 5% probability
if more_than_one_key && rng.gen_bool(0.05) {
self.verify_peer_storage(sender.clone());

// In additon to verify the sender, we also verify a random close node.
// This is to avoid malicious node escaping the check by never send a replication_list.
// With further reduced probability of 1% (5% * 20%)
if rng.gen_bool(0.2) {
let close_group_peers = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_local_peers(&self.self_peer_id.into())
.map(|peer| peer.into_preimage())
.take(CLOSE_GROUP_SIZE)
.collect_vec();
if close_group_peers.len() == CLOSE_GROUP_SIZE {
loop {
let index: usize = OsRng.gen_range(0..close_group_peers.len());
let candidate = NetworkAddress::from_peer(close_group_peers[index]);
if sender != candidate {
self.verify_peer_storage(candidate);
break;
}
}
}
}
}
}

/// Check among all chunk type records that we have, select those close to the peer,
/// and randomly pick one as the verification candidate.
fn verify_peer_storage(&mut self, peer: NetworkAddress) {
let mut closest_peers = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_local_peers(&self.self_peer_id.into())
.map(|peer| peer.into_preimage())
.take(20)
.collect_vec();
closest_peers.push(self.self_peer_id);

let target_peer = if let Some(peer_id) = peer.as_peer_id() {
peer_id
} else {
error!("Target {peer:?} is not a valid PeerId");
return;
};

let all_keys = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.record_addresses_ref();

// Targeted chunk type record shall be expected within the close range from our perspective.
let mut verify_candidates: Vec<NetworkAddress> = all_keys
.values()
.filter_map(|(addr, record_type)| {
if RecordType::Chunk == *record_type {
match sort_peers_by_address(&closest_peers, addr, CLOSE_GROUP_SIZE) {
Ok(close_group) => {
if close_group.contains(&&target_peer) {
Some(addr.clone())
} else {
None
}
}
Err(err) => {
warn!("Could not get sorted peers for {addr:?} with error {err:?}");
None
}
}
} else {
None
}
})
.collect();

verify_candidates.sort_by_key(|a| peer.distance(a));

// To ensure the candidate must have to be held by the peer,
// we only carry out check when there are already certain amount of chunks uploaded
// AND choose candidate from certain reduced range.
if verify_candidates.len() > 50 {
let index: usize = OsRng.gen_range(0..(verify_candidates.len() / 2));
self.send_event(NetworkEvent::ChunkProofVerification {
peer_id: target_peer,
key_to_verify: verify_candidates[index].clone(),
});
} else {
debug!("No valid candidate to be checked against peer {peer:?}");
}
}
}
35 changes: 29 additions & 6 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ impl Network {
}

/// Get the Chunk existence proof from the close nodes to the provided chunk address.
/// This is to be used by client only to verify the success of the upload.
pub async fn verify_chunk_existence(
&self,
chunk_address: NetworkAddress,
Expand Down Expand Up @@ -304,21 +305,30 @@ impl Network {
let request = Request::Query(Query::GetChunkExistenceProof {
key: chunk_address.clone(),
nonce,
difficulty: 1,
});
let responses = self
.send_and_get_responses(&close_nodes, &request, true)
.await;
let n_verified = responses
.into_iter()
.filter_map(|(peer, resp)| {
if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(Ok(proof)))) =
if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(proofs))) =
resp
{
if expected_proof.verify(&proof) {
debug!("Got a valid ChunkProof from {peer:?}");
Some(())
if proofs.is_empty() {
warn!("Failed to verify the ChunkProof from {peer:?}. Returned proof is empty.");
None
} else if let Ok(ref proof) = proofs[0].1 {
if expected_proof.verify(proof) {
debug!("Got a valid ChunkProof from {peer:?}");
Some(())
} else {
warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?");
None
}
} else {
warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?");
warn!("Failed to verify the ChunkProof from {peer:?}, returned with error {:?}", proofs[0].1);
None
}
} else {
Expand Down Expand Up @@ -370,7 +380,12 @@ impl Network {
return Err(NetworkError::NoStoreCostResponses);
}

let request = Request::Query(Query::GetStoreCost(record_address.clone()));
// Client shall decide whether to carry out storage verification or not.
let request = Request::Query(Query::GetStoreCost {
key: record_address.clone(),
nonce: None,
difficulty: 0,
});
let responses = self
.send_and_get_responses(&close_nodes, &request, true)
.await;
Expand All @@ -388,7 +403,11 @@ impl Network {
quote: Ok(quote),
payment_address,
peer_address,
storage_proofs,
}) => {
if !storage_proofs.is_empty() {
debug!("Storage proofing during GetStoreCost to be implemented.");
}
// Check the quote itself is valid.
if quote.cost
!= AttoTokens::from_u64(calculate_cost_for_records(
Expand All @@ -406,7 +425,11 @@ impl Network {
quote: Err(ProtocolError::RecordExists(_)),
payment_address,
peer_address,
storage_proofs,
}) => {
if !storage_proofs.is_empty() {
debug!("Storage proofing during GetStoreCost to be implemented.");
}
all_costs.push((peer_address, payment_address, PaymentQuote::zero()));
}
_ => {
Expand Down
1 change: 1 addition & 0 deletions sn_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ futures = "~0.3.13"
hex = "~0.4.3"
itertools = "~0.12.1"
libp2p = { version = "0.54.1", features = ["tokio", "dns", "kad", "macros"] }
num-traits = "0.2"
prometheus-client = { version = "0.22", optional = true }
# watch out updating this, protoc compiler needs to be installed on all build systems
# arm builds + musl are very problematic
Expand Down
Loading

0 comments on commit 11b659b

Please sign in to comment.