diff --git a/sn_networking/src/event/request_response.rs b/sn_networking/src/event/request_response.rs index 5a8999703f..08131897a7 100644 --- a/sn_networking/src/event/request_response.rs +++ b/sn_networking/src/event/request_response.rs @@ -7,12 +7,10 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{ - cmd::NetworkSwarmCmd, log_markers::Marker, sort_peers_by_address, MsgResponder, NetworkError, - NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE, + cmd::NetworkSwarmCmd, log_markers::Marker, MsgResponder, NetworkError, NetworkEvent, + SwarmDriver, }; -use itertools::Itertools; use libp2p::request_response::{self, Message}; -use rand::{rngs::OsRng, thread_rng, Rng}; use sn_protocol::{ messages::{CmdResponse, Request, Response}, storage::RecordType, @@ -210,14 +208,10 @@ impl SwarmDriver { return; } - let more_than_one_key = incoming_keys.len() > 1; - - // On receive a replication_list from a close_group peer, we undertake two tasks: + // On receive a replication_list from a close_group peer, we undertake: // 1, For those keys that we don't have: // fetch them if close enough to us - // 2, For those keys that we have and supposed to be held by the sender as well: - // start chunk_proof check against a randomly selected chunk type record to the sender - // 3, For those spends that we have that differ in the hash, we fetch the other version + // 2, For those spends that we have that differ in the hash, we fetch the other version // and update our local copy. let all_keys = self .swarm @@ -233,103 +227,5 @@ impl SwarmDriver { } else { self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch)); } - - // Only trigger chunk_proof check based every X% of the time - let mut rng = thread_rng(); - // 5% probability - if more_than_one_key && rng.gen_bool(0.05) { - self.verify_peer_storage(sender.clone()); - - // In additon to verify the sender, we also verify a random close node. - // This is to avoid malicious node escaping the check by never send a replication_list. - // With further reduced probability of 1% (5% * 20%) - if rng.gen_bool(0.2) { - let close_group_peers = self - .swarm - .behaviour_mut() - .kademlia - .get_closest_local_peers(&self.self_peer_id.into()) - .map(|peer| peer.into_preimage()) - .take(CLOSE_GROUP_SIZE) - .collect_vec(); - if close_group_peers.len() == CLOSE_GROUP_SIZE { - loop { - let index: usize = OsRng.gen_range(0..close_group_peers.len()); - let candidate = NetworkAddress::from_peer(close_group_peers[index]); - if sender != candidate { - self.verify_peer_storage(candidate); - break; - } - } - } - } - } - } - - /// Check among all chunk type records that we have, select those close to the peer, - /// and randomly pick one as the verification candidate. - fn verify_peer_storage(&mut self, peer: NetworkAddress) { - let mut closest_peers = self - .swarm - .behaviour_mut() - .kademlia - .get_closest_local_peers(&self.self_peer_id.into()) - .map(|peer| peer.into_preimage()) - .take(20) - .collect_vec(); - closest_peers.push(self.self_peer_id); - - let target_peer = if let Some(peer_id) = peer.as_peer_id() { - peer_id - } else { - error!("Target {peer:?} is not a valid PeerId"); - return; - }; - - let all_keys = self - .swarm - .behaviour_mut() - .kademlia - .store_mut() - .record_addresses_ref(); - - // Targeted chunk type record shall be expected within the close range from our perspective. - let mut verify_candidates: Vec = all_keys - .values() - .filter_map(|(addr, record_type)| { - if RecordType::Chunk == *record_type { - match sort_peers_by_address(&closest_peers, addr, CLOSE_GROUP_SIZE) { - Ok(close_group) => { - if close_group.contains(&&target_peer) { - Some(addr.clone()) - } else { - None - } - } - Err(err) => { - warn!("Could not get sorted peers for {addr:?} with error {err:?}"); - None - } - } - } else { - None - } - }) - .collect(); - - verify_candidates.sort_by_key(|a| peer.distance(a)); - - // To ensure the candidate must have to be held by the peer, - // we only carry out check when there are already certain amount of chunks uploaded - // AND choose candidate from certain reduced range. - if verify_candidates.len() > 50 { - let index: usize = OsRng.gen_range(0..(verify_candidates.len() / 2)); - self.send_event(NetworkEvent::ChunkProofVerification { - peer_id: target_peer, - key_to_verify: verify_candidates[index].clone(), - }); - } else { - debug!("No valid candidate to be checked against peer {peer:?}"); - } } } diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index b7118d18a3..988351c4be 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -269,6 +269,7 @@ impl Network { } /// Get the Chunk existence proof from the close nodes to the provided chunk address. + /// This is to be used by client only to verify the success of the upload. pub async fn verify_chunk_existence( &self, chunk_address: NetworkAddress, @@ -304,6 +305,7 @@ impl Network { let request = Request::Query(Query::GetChunkExistenceProof { key: chunk_address.clone(), nonce, + expected_answers: 1, }); let responses = self .send_and_get_responses(&close_nodes, &request, true) @@ -311,14 +313,22 @@ impl Network { let n_verified = responses .into_iter() .filter_map(|(peer, resp)| { - if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(Ok(proof)))) = + if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(proofs))) = resp { - if expected_proof.verify(&proof) { - debug!("Got a valid ChunkProof from {peer:?}"); - Some(()) + if proofs.is_empty() { + warn!("Failed to verify the ChunkProof from {peer:?}. Returned proof is empty."); + None + } else if let Ok(ref proof) = proofs[0].1 { + if expected_proof.verify(proof) { + debug!("Got a valid ChunkProof from {peer:?}"); + Some(()) + } else { + warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?"); + None + } } else { - warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?"); + warn!("Failed to verify the ChunkProof from {peer:?}, returned with error {:?}", proofs[0].1); None } } else { diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index bff4266b6b..8055cd7230 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -13,18 +13,22 @@ use super::{ use crate::metrics::NodeMetricsRecorder; use crate::RunningNode; use bytes::Bytes; +use itertools::Itertools; use libp2p::{identity::Keypair, Multiaddr, PeerId}; -use rand::{rngs::StdRng, thread_rng, Rng, SeedableRng}; +use rand::{ + rngs::{OsRng, StdRng}, + thread_rng, Rng, SeedableRng, +}; use sn_evm::{AttoTokens, RewardsAddress}; #[cfg(feature = "open-metrics")] use sn_networking::MetricsRegistries; use sn_networking::{ - close_group_majority, Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue, - SwarmDriver, + close_group_majority, Instant, Network, NetworkBuilder, NetworkEvent, NodeIssue, SwarmDriver, }; use sn_protocol::{ error::Error as ProtocolError, - messages::{ChunkProof, CmdResponse, Query, QueryResponse, Request, Response}, + messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response}, + storage::RecordType, NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, }; use std::{ @@ -51,12 +55,9 @@ pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180; /// This is the max time it should take. Minimum interval at any node will be half this const PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S: u64 = 600; -/// Max number of attempts that chunk proof verification will be carried out against certain target, -/// before classifying peer as a bad peer. -const MAX_CHUNK_PROOF_VERIFY_ATTEMPTS: usize = 3; - -/// Interval between chunk proof verification to be retired against the same target. -const CHUNK_PROOF_VERIFY_RETRY_INTERVAL: Duration = Duration::from_secs(15); +/// Interval to trigger storage challenge. +/// This is the max time it should take. Minimum interval at any node will be half this +const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200; /// Interval to update the nodes uptime metric const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); @@ -277,6 +278,17 @@ impl Node { tokio::time::interval(UNRELEVANT_RECORDS_CLEANUP_INTERVAL); let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately + // use a random neighbour storege challenge ticker to ensure + // neighbour do not carryout challenges at the same time + let storage_challenge_interval: u64 = + rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S); + let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval); + debug!("Storage challenge interval set to {storage_challenge_interval_time:?}"); + + let mut storage_challenge_interval = + tokio::time::interval(storage_challenge_interval_time); + let _ = storage_challenge_interval.tick().await; // first tick completes immediately + loop { let peers_connected = &peers_connected; @@ -341,6 +353,17 @@ impl Node { Self::trigger_irrelevant_record_cleanup(network); }); } + // runs every storage_challenge_interval time + _ = storage_challenge_interval.tick() => { + let start = Instant::now(); + debug!("Periodic storage challenge triggered"); + let network = self.network().clone(); + + let _handle = spawn(async move { + Self::storage_challenge(network).await; + trace!("Periodic storege challenge took {:?}", start.elapsed()); + }); + } } } }); @@ -491,28 +514,16 @@ impl Node { event_header = "ChunkProofVerification"; let network = self.network().clone(); - debug!("Going to verify chunk {key_to_verify} against peer {peer_id:?}"); + debug!("Going to carry out storage existence check against peer {peer_id:?}"); let _handle = spawn(async move { - // To avoid the peer is in the process of getting the copy via replication, - // repeat the verification for couple of times (in case of error). - // Only report the node as bad when ALL the verification attempts failed. - let mut attempts = 0; - while attempts < MAX_CHUNK_PROOF_VERIFY_ATTEMPTS { - if chunk_proof_verify_peer(&network, peer_id, &key_to_verify).await { - return; - } - // Replication interval is 22s - 45s. - // Hence some re-try erquired to allow copies to spread out. - tokio::time::sleep(CHUNK_PROOF_VERIFY_RETRY_INTERVAL).await; - attempts += 1; + if chunk_proof_verify_peer(&network, peer_id, &key_to_verify).await { + return; } - // Now ALL attempts failed, hence report the issue. - // Note this won't immediately trigger the node to be considered as BAD. - // Only the same peer accumulated three same issue - // within 5 mins will be considered as BAD. - // As the chunk_proof_check will be triggered every periodical replication, - // a low performed or cheaty peer will raise multiple issue alerts during it. + info!("Peer {peer_id:?} failed storage existence challenge."); + // TODO: shall challenge failure immediately triggers the node to be removed? + // or to lower connection score once feature introduced. + // If score falls too low, sever connection. network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck); }); } @@ -675,21 +686,17 @@ impl Node { QueryResponse::GetReplicatedRecord(result) } - Query::GetChunkExistenceProof { key, nonce } => { - debug!("Got GetChunkExistenceProof for chunk {key:?}"); - - let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone())); - if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await { - let proof = ChunkProof::new(&record.value, nonce); - debug!("Chunk proof for {key:?} is {proof:?}"); - result = Ok(proof) - } else { - debug!( - "Could not get ChunkProof for {key:?} as we don't have the record locally." - ); - } + Query::GetChunkExistenceProof { + key, + nonce, + expected_answers, + } => { + debug!("Got GetChunkExistenceProof targeting chunk {key:?} with {expected_answers} answers."); - QueryResponse::GetChunkExistenceProof(result) + QueryResponse::GetChunkExistenceProof( + Self::respond_x_closest_chunk_proof(network, key, nonce, expected_answers) + .await, + ) } Query::CheckNodeInProblem(target_address) => { debug!("Got CheckNodeInProblem for peer {target_address:?}"); @@ -712,6 +719,126 @@ impl Node { Response::Query(resp) } + async fn respond_x_closest_chunk_proof( + network: &Network, + key: NetworkAddress, + nonce: Nonce, + expected_answers: usize, + ) -> Vec<(NetworkAddress, Result)> { + let mut results = vec![]; + if expected_answers == 1 { + // Client checking existence of published chunk. + let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone())); + if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await { + let proof = ChunkProof::new(&record.value, nonce); + debug!("Chunk proof for {key:?} is {proof:?}"); + result = Ok(proof) + } else { + debug!("Could not get ChunkProof for {key:?} as we don't have the record locally."); + } + + results.push((key.clone(), result)); + } else { + let all_local_records = network.get_all_local_record_addresses().await; + + if let Ok(all_local_records) = all_local_records { + // Only `ChunkRecord`s can be consistantly verified + let mut all_chunk_addrs: Vec<_> = all_local_records + .iter() + .filter_map(|(addr, record_type)| { + if *record_type == RecordType::Chunk { + Some(addr.clone()) + } else { + None + } + }) + .collect(); + + // Sort by distance and only take first X closest entries + all_chunk_addrs.sort_by_key(|addr| key.distance(addr)); + + // TODO: this shall be deduced from resource usage dynamically + let workload_factor = std::cmp::min(expected_answers, CLOSE_GROUP_SIZE); + + for addr in all_chunk_addrs.iter().take(workload_factor) { + if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await + { + let proof = ChunkProof::new(&record.value, nonce); + debug!("Chunk proof for {key:?} is {proof:?}"); + results.push((addr.clone(), Ok(proof))); + } + } + } + } + + debug!( + "Respond with {} answers to ChunkProofChallenge targeting {key:?}.", + results.len() + ); + + results + } + + /// Check among all chunk type records that we have, + /// and randomly pick one as the verification candidate. + /// This will challenge all closest peers at once. + async fn storage_challenge(network: Network) { + let closest_peers: Vec = + if let Ok(closest_peers) = network.get_closest_k_value_local_peers().await { + closest_peers + .into_iter() + .take(CLOSE_GROUP_SIZE) + .collect_vec() + } else { + error!("Cannot get local neighbours"); + return; + }; + if closest_peers.len() < CLOSE_GROUP_SIZE { + debug!( + "Not enough neighbours ({}/{}) to carry out storage challenge.", + closest_peers.len(), + CLOSE_GROUP_SIZE + ); + return; + } + + let verify_candidates: Vec = + if let Ok(all_keys) = network.get_all_local_record_addresses().await { + all_keys + .iter() + .filter_map(|(addr, record_type)| { + if RecordType::Chunk == *record_type { + Some(addr.clone()) + } else { + None + } + }) + .collect() + } else { + error!("Failed to get local record addresses."); + return; + }; + let num_of_targets = verify_candidates.len(); + if num_of_targets < 50 { + debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours."); + return; + } + + // TODO: launch the challenges parrallely, so that a scoring scheme can be utilized. + for peer_id in closest_peers { + if peer_id == network.peer_id() { + continue; + } + + let index: usize = OsRng.gen_range(0..num_of_targets); + if !chunk_proof_verify_peer(&network, peer_id, &verify_candidates[index]).await { + info!("Peer {peer_id:?} failed storage challenge."); + // TODO: shall the challenge failure immediately triggers the node to be removed? + network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck); + } + } + } + async fn try_bad_nodes_check(network: Network, rolling_index: usize) { if let Ok(kbuckets) = network.get_kbuckets().await { let total_peers: usize = kbuckets.values().map(|peers| peers.len()).sum(); @@ -770,58 +897,51 @@ impl Node { } async fn chunk_proof_verify_peer(network: &Network, peer_id: PeerId, key: &NetworkAddress) -> bool { - let check_passed = if let Ok(Some(record)) = - network.get_local_record(&key.to_record_key()).await - { - let nonce = thread_rng().gen::(); - let expected_proof = ChunkProof::new(&record.value, nonce); - debug!("To verify peer {peer_id:?}, chunk_proof for {key:?} is {expected_proof:?}"); + let nonce: Nonce = thread_rng().gen::(); - let request = Request::Query(Query::GetChunkExistenceProof { - key: key.clone(), - nonce, - }); - let responses = network - .send_and_get_responses(&[peer_id], &request, true) - .await; - let n_verified = responses - .into_iter() - .filter_map(|(peer, resp)| received_valid_chunk_proof(key, &expected_proof, peer, resp)) - .count(); - - n_verified >= 1 - } else { - error!( - "To verify peer {peer_id:?} Could not get ChunkProof for {key:?} as we don't have the record locally." - ); - true - }; + let request = Request::Query(Query::GetChunkExistenceProof { + key: key.clone(), + nonce, + expected_answers: CLOSE_GROUP_SIZE, + }); - if !check_passed { - return false; - } + let responses = network + .send_and_get_responses(&[peer_id], &request, true) + .await; - true -} + // TODO: cross check with local knowledge (i.e. the claimed closest shall match locals) + // this also prevent peer falsely give empty or non-existent answers. -fn received_valid_chunk_proof( - key: &NetworkAddress, - expected_proof: &ChunkProof, - peer: PeerId, - resp: Result, -) -> Option<()> { - if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(Ok(proof)))) = resp { - if expected_proof.verify(&proof) { - debug!( - "Got a valid ChunkProof of {key:?} from {peer:?}, during peer chunk proof check." - ); - Some(()) - } else { - warn!("When verify {peer:?} with ChunkProof of {key:?}, the chunk might have been tampered?"); - None + if let Some(Ok(Response::Query(QueryResponse::GetChunkExistenceProof(answers)))) = + responses.get(&peer_id) + { + if answers.is_empty() { + info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge."); + return false; + } + for (addr, proof) in answers { + if let Ok(proof) = proof { + if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await { + let expected_proof = ChunkProof::new(&record.value, nonce); + // Any wrong answer shall be considered as a failure + if *proof != expected_proof { + return false; + } + } else { + debug!( + "Could not get ChunkProof for {addr:?} as we don't have the record locally." + ); + } + } else { + debug!( + "Could not verify answer of {addr:?} from {peer_id:?} as responded with {proof:?}" + ); + } } } else { - debug!("Did not get a valid response for the ChunkProof from {peer:?}"); - None + info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error."); + return false; } + + true } diff --git a/sn_protocol/src/messages/query.rs b/sn_protocol/src/messages/query.rs index b28f6830fa..43f8b4e97c 100644 --- a/sn_protocol/src/messages/query.rs +++ b/sn_protocol/src/messages/query.rs @@ -47,6 +47,9 @@ pub enum Query { key: NetworkAddress, /// The random nonce that the node uses to produce the Proof (i.e., hash(record+nonce)) nonce: Nonce, + /// Expected number of answers. For client publish verification, use 1 for efficiency. + /// Node shall try their best to fulfill the number, based on their capacity. + expected_answers: usize, }, /// Queries close_group peers whether the target peer is a bad_node CheckNodeInProblem(NetworkAddress), @@ -78,8 +81,15 @@ impl std::fmt::Display for Query { Query::GetRegisterRecord { key, requester } => { write!(f, "Query::GetRegisterRecord({requester:?} {key:?})") } - Query::GetChunkExistenceProof { key, nonce } => { - write!(f, "Query::GetChunkExistenceProof({key:?} {nonce:?})") + Query::GetChunkExistenceProof { + key, + nonce, + expected_answers, + } => { + write!( + f, + "Query::GetChunkExistenceProof({key:?} {nonce:?} {expected_answers})" + ) } Query::CheckNodeInProblem(address) => { write!(f, "Query::CheckNodeInProblem({address:?})") diff --git a/sn_protocol/src/messages/response.rs b/sn_protocol/src/messages/response.rs index 17c986f581..44e9932c23 100644 --- a/sn_protocol/src/messages/response.rs +++ b/sn_protocol/src/messages/response.rs @@ -56,7 +56,7 @@ pub enum QueryResponse { /// Response to [`GetChunkExistenceProof`] /// /// [`GetChunkExistenceProof`]: crate::messages::Query::GetChunkExistenceProof - GetChunkExistenceProof(Result), + GetChunkExistenceProof(Vec<(NetworkAddress, Result)>), } // Debug implementation for QueryResponse, to avoid printing Vec @@ -109,8 +109,9 @@ impl Debug for QueryResponse { write!(f, "GetRegisterRecord(Err({err:?}))") } }, - QueryResponse::GetChunkExistenceProof(proof) => { - write!(f, "GetChunkExistenceProof(proof: {proof:?})") + QueryResponse::GetChunkExistenceProof(proofs) => { + let addresses: Vec<_> = proofs.iter().map(|(addr, _)| addr.clone()).collect(); + write!(f, "GetChunkExistenceProof(checked chunks: {addresses:?})") } } }