diff --git a/.github/workflows/merge.yml b/.github/workflows/merge.yml index 98ee999b06..1b5395b028 100644 --- a/.github/workflows/merge.yml +++ b/.github/workflows/merge.yml @@ -531,15 +531,19 @@ jobs: # platform: ${{ matrix.os }} # build: true - # - name: Check SAFE_PEERS was set - # shell: bash - # run: | - # if [[ -z "$SAFE_PEERS" ]]; then - # echo "The SAFE_PEERS variable has not been set" - # exit 1 - # else - # echo "SAFE_PEERS has been set to $SAFE_PEERS" - # fi + # # incase the faucet is not ready yet + # - name: 30s sleep for faucet completion + # run: sleep 30 + + # - name: Check SAFE_PEERS was set + # shell: bash + # run: | + # if [[ -z "$SAFE_PEERS" ]]; then + # echo "The SAFE_PEERS variable has not been set" + # exit 1 + # else + # echo "SAFE_PEERS has been set to $SAFE_PEERS" + # fi # - name: execute token_distribution tests # run: cargo test --release --features=local,distribution token_distribution -- --nocapture --test-threads=1 @@ -631,7 +635,37 @@ jobs: log_file_prefix: safe_test_logs_churn platform: ${{ matrix.os }} - - name: Verify restart of nodes using rg + - name: Get total node count + shell: bash + timeout-minutes: 1 + run: | + node_count=$(ls "${{ matrix.node_data_path }}" | wc -l) + echo "Node dir count is $node_count" + + - name: Get restart of nodes using rg + shell: bash + timeout-minutes: 1 + # get the counts, then the specific line, and then the digit count only + # then check we have an expected level of restarts + # TODO: make this use an env var, or relate to testnet size + run: | + restart_count=$(rg "Node is restarting in" "${{ matrix.node_data_path }}" -c --stats | \ + rg "(\d+) matches" | rg "\d+" -o) + echo "Restarted $restart_count nodes" + + - name: Get peers removed from nodes using rg + shell: bash + timeout-minutes: 1 + run: | + peer_removed=$(rg "PeerRemovedFromRoutingTable" "${{ matrix.node_data_path }}" -c --stats | \ + rg "(\d+) matches" | rg "\d+" -o) || { echo "Failed to extract peer removal count"; exit 1; } + if [ -z "$peer_removed" ]; then + echo "No peer removal count found" + exit 1 + fi + echo "PeerRemovedFromRoutingTable $peer_removed times" + + - name: Verify peers removed exceed restarted node counts shell: bash timeout-minutes: 1 # get the counts, then the specific line, and then the digit count only @@ -648,8 +682,6 @@ jobs: echo "PeerRemovedFromRoutingTable times of: $peer_removed is less than the restart count of: $restart_count" exit 1 fi - node_count=$(ls "${{ matrix.node_data_path }}" | wc -l) - echo "Node dir count is $node_count" # TODO: reenable this once the testnet dir creation is tidied up to avoid a large count here # if [ $restart_count -lt $node_count ]; then @@ -768,7 +800,7 @@ jobs: uses: maidsafe/sn-local-testnet-action@main with: action: stop - log_file_prefix: safe_test_logs_data_location + log_file_prefix: safe_test_logs_data_location_routing_table platform: ${{ matrix.os }} - name: Verify restart of nodes using rg @@ -860,15 +892,15 @@ jobs: # echo "SAFE_PEERS has been set to $SAFE_PEERS" # fi - # - name: Create and fund a wallet first time - # run: | - # ~/safe --log-output-dest=data-dir wallet create --no-password - # ~/faucet --log-output-dest=data-dir send 100000000 $(~/safe --log-output-dest=data-dir wallet address | tail -n 1) | tail -n 1 1>first.txt - # echo "----------" - # cat first.txt - # env: - # SN_LOG: "all" - # timeout-minutes: 5 + # - name: Create and fund a wallet first time + # run: | + # ~/safe --log-output-dest=data-dir wallet create --no-password + # ~/faucet --log-output-dest=data-dir send 100000000 $(~/safe --log-output-dest=data-dir wallet address | tail -n 1) | tail -n 1 1>first.txt + # echo "----------" + # cat first.txt + # env: + # SN_LOG: "all" + # timeout-minutes: 5 # - name: Move faucet log to the working folder # run: | @@ -894,44 +926,64 @@ jobs: # continue-on-error: true # if: always() - # - name: Create and fund a wallet second time - # run: | - # ls -l /home/runner/.local/share - # ls -l /home/runner/.local/share/safe - # rm -rf /home/runner/.local/share/safe/test_faucet - # rm -rf /home/runner/.local/share/safe/test_genesis - # rm -rf /home/runner/.local/share/safe/client - # ~/safe --log-output-dest=data-dir wallet create --no-password - # ~/faucet --log-output-dest=data-dir send 100000000 $(~/safe --log-output-dest=data-dir wallet address | tail -n 1) | tail -n 1 1>second.txt - # echo "----------" - # cat second.txt - # if grep "genesis is already spent" second.txt; then - # echo "Duplicated faucet rejected" - # else - # echo "Duplicated faucet not rejected!" - # exit 1 - # fi - # env: - # SN_LOG: "all" - # timeout-minutes: 5 - - # - name: Create and fund a wallet with different keypair - # run: | - # ls -l /home/runner/.local/share - # ls -l /home/runner/.local/share/safe - # rm -rf /home/runner/.local/share/safe/test_faucet - # rm -rf /home/runner/.local/share/safe/test_genesis - # rm -rf /home/runner/.local/share/safe/client - # ~/safe --log-output-dest=data-dir wallet create --no-password - # if GENESIS_PK=a9925296499299fdbf4412509d342a92e015f5b996e9acd1d2ab7f2326e3ad05934326efdc345345a95e973ac1bb6637 GENESIS_SK=40f6bbc870355c68138ac70b450b6425af02b49874df3f141b7018378ceaac66 nohup ~/faucet --log-output-dest=data-dir send 100000000 $(~/safe --log-output-dest=data-dir wallet address | tail -n 1); then - # echo "Faucet with different genesis key not rejected!" - # exit 1 - # else - # echo "Faucet with different genesis key rejected" - # fi - # env: - # SN_LOG: "all" - # timeout-minutes: 5 + # - name: Cleanup prior faucet and cashnotes + # run: | + # ls -l /home/runner/.local/share + # ls -l /home/runner/.local/share/safe + # rm -rf /home/runner/.local/share/safe/test_faucet + # rm -rf /home/runner/.local/share/safe/test_genesis + # rm -rf /home/runner/.local/share/safe/client + # env: + # SN_LOG: "all" + # timeout-minutes: 5 + + # - name: Create a new wallet + # run: ~/safe --log-output-dest=data-dir wallet create --no-password + # env: + # SN_LOG: "all" + # timeout-minutes: 5 + + # - name: Attempt second faucet genesis disbursement + # run: ~/faucet --log-output-dest=data-dir send 100000000 $(~/safe --log-output-dest=data-dir wallet address | tail -n 1) > second.txt 2>&1 || true + # env: + # SN_LOG: "all" + # timeout-minutes: 5 + + # - name: cat second.txt + # run: cat second.txt + # env: + # SN_LOG: "all" + # timeout-minutes: 5 + + # - name: Verify a second disbursement is rejected + # run: | + # if grep "Faucet disbursement has already occured" second.txt; then + # echo "Duplicated faucet rejected" + # else + # echo "Duplicated faucet not rejected!" + # exit 1 + # fi + # env: + # SN_LOG: "all" + # timeout-minutes: 5 + + # - name: Create and fund a wallet with different keypair + # run: | + # ls -l /home/runner/.local/share + # ls -l /home/runner/.local/share/safe + # rm -rf /home/runner/.local/share/safe/test_faucet + # rm -rf /home/runner/.local/share/safe/test_genesis + # rm -rf /home/runner/.local/share/safe/client + # ~/safe --log-output-dest=data-dir wallet create --no-password + # if GENESIS_PK=a9925296499299fdbf4412509d342a92e015f5b996e9acd1d2ab7f2326e3ad05934326efdc345345a95e973ac1bb6637 GENESIS_SK=40f6bbc870355c68138ac70b450b6425af02b49874df3f141b7018378ceaac66 nohup ~/faucet --log-output-dest=data-dir send 100000000 $(~/safe --log-output-dest=data-dir wallet address | tail -n 1); then + # echo "Faucet with different genesis key not rejected!" + # exit 1 + # else + # echo "Faucet with different genesis key rejected" + # fi + # env: + # SN_LOG: "all" + # timeout-minutes: 5 # - name: Build faucet binary again without the gifting feature # run: cargo build --release --bin faucet @@ -1059,14 +1111,14 @@ jobs: # echo "PWD subdirs:" # du -sh */ - # - name: Create and fund a wallet to pay for files storage - # run: | - # ~/safe --log-output-dest=data-dir wallet create --no-password - # ~/faucet --log-output-dest=data-dir send 100000000 $(~/safe --log-output-dest=data-dir wallet address | tail -n 1) | tail -n 1 > transfer_hex - # ~/safe --log-output-dest=data-dir wallet receive --file transfer_hex - # env: - # SN_LOG: "all" - # timeout-minutes: 5 + # - name: Create and fund a wallet to pay for files storage + # run: | + # ~/safe --log-output-dest=data-dir wallet create --no-password + # ~/faucet --log-output-dest=data-dir send 100000000 $(~/safe --log-output-dest=data-dir wallet address | tail -n 1) | tail -n 1 > transfer_hex + # ~/safe --log-output-dest=data-dir wallet receive --file transfer_hex + # env: + # SN_LOG: "all" + # timeout-minutes: 5 # - name: Start a client to upload # run: ~/safe --log-output-dest=data-dir files upload "ubuntu-14.04.6-desktop-i386.iso" --retry-strategy quick @@ -1116,6 +1168,8 @@ jobs: # if: "!startsWith(github.event.head_commit.message, 'chore(release):')" # name: Replication bench with heavy upload # runs-on: ubuntu-latest + # env: + # CLIENT_DATA_PATH: /home/runner/.local/share/safe/client # steps: # - uses: actions/checkout@v4 @@ -1192,14 +1246,28 @@ jobs: # echo "SAFE_PEERS has been set to $SAFE_PEERS" # fi - # - name: Create and fund a wallet to pay for files storage - # run: | - # ./target/release/safe --log-output-dest=data-dir wallet create --no-password - # ./target/release/faucet --log-output-dest=data-dir send 100000000 $(./target/release/safe --log-output-dest=data-dir wallet address | tail -n 1) | tail -n 1 > transfer_hex - # ./target/release/safe --log-output-dest=data-dir wallet receive --file transfer_hex - # env: - # SN_LOG: "all" - # timeout-minutes: 5 + # - name: Sleep 15s + # shell: bash + # run: sleep 15 + + # - name: Check faucet has been funded + # shell: bash + # run: | + # cash_note_count=$(ls -l /home/runner/.local/share/safe/test_faucet/wallet/cash_notes/ | wc -l) + # echo $cash_note_count + # if [ "$cash_note_count" -eq 0 ]; then + # echo "Error: Expected at least 1 cash note, but found $cash_note_count" + # exit 1 + # fi + + # - name: Create and fund a wallet to pay for files storage + # run: | + # ./target/release/safe --log-output-dest=data-dir wallet create --no-password + # ./target/release/faucet --log-output-dest=data-dir send 100000000 $(./target/release/safe --log-output-dest=data-dir wallet address | tail -n 1) | tail -n 1 > transfer_hex + # ./target/release/safe --log-output-dest=data-dir wallet receive --file transfer_hex + # env: + # SN_LOG: "all" + # timeout-minutes: 5 # - name: Start a client to upload first file # run: ./target/release/safe --log-output-dest=data-dir files upload "./test_data_1.tar.gz" --retry-strategy quick @@ -1207,29 +1275,32 @@ jobs: # SN_LOG: "all" # timeout-minutes: 5 - # - name: Ensure no leftover cash_notes and payment files - # run: | - # expected_cash_notes_files="1" - # expected_payment_files="0" - # pwd - # ls $CLIENT_DATA_PATH/ -l - # ls $CLIENT_DATA_PATH/wallet -l - # ls $CLIENT_DATA_PATH/wallet/cash_notes -l - # cash_note_files=$(ls $CLIENT_DATA_PATH/wallet/cash_notes | wc -l) - # echo "Find $cash_note_files cash_note files" - # if [ $expected_cash_notes_files -lt $cash_note_files ]; then - # echo "Got too many cash_note files leftover: $cash_note_files" - # exit 1 - # fi - # ls $CLIENT_DATA_PATH/wallet/payments -l - # payment_files=$(ls $CLIENT_DATA_PATH/wallet/payments | wc -l) - # if [ $expected_payment_files -lt $payment_files ]; then - # echo "Got too many payment files leftover: $payment_files" - # exit 1 - # fi - # env: - # CLIENT_DATA_PATH: /home/runner/.local/share/safe/client - # timeout-minutes: 10 + # - name: Check current directories + # run: | + # pwd + # ls $CLIENT_DATA_PATH/ -l + # ls $CLIENT_DATA_PATH/wallet -l + # ls $CLIENT_DATA_PATH/wallet/cash_notes -l + # timeout-minutes: 1 + + # - name: Ensure no leftover cash_notes and payment files + # run: | + # expected_cash_notes_files="1" + # expected_payment_files="0" + # cash_note_files=$(ls $CLIENT_DATA_PATH/wallet/cash_notes | wc -l) + # echo "Find $cash_note_files cash_note files" + # if [ $expected_cash_notes_files -lt $cash_note_files ]; then + # echo "Got too many cash_note files leftover: $cash_note_files" + # exit 1 + # fi + # ls $CLIENT_DATA_PATH/wallet/payments -l + # payment_files=$(ls $CLIENT_DATA_PATH/wallet/payments | wc -l) + # if [ $expected_payment_files -lt $payment_files ]; then + # echo "Got too many payment files leftover: $payment_files" + # exit 1 + # fi + + # timeout-minutes: 10 # - name: Wait for certain period # run: sleep 300 @@ -1241,52 +1312,49 @@ jobs: # SN_LOG: "all" # timeout-minutes: 10 - # - name: Ensure no leftover cash_notes and payment files - # run: | - # expected_cash_notes_files="1" - # expected_payment_files="0" - # pwd - # ls $CLIENT_DATA_PATH/ -l - # ls $CLIENT_DATA_PATH/wallet -l - # ls $CLIENT_DATA_PATH/wallet/cash_notes -l - # cash_note_files=$(find $CLIENT_DATA_PATH/wallet/cash_notes -type f | wc -l) - # if (( $(echo "$cash_note_files > $expected_cash_notes_files" | bc -l) )); then - # echo "Got too many cash_note files leftover: $cash_note_files when we expected $expected_cash_notes_files" - # exit 1 - # fi - # ls $CLIENT_DATA_PATH/wallet/payments -l - # payment_files=$(find $CLIENT_DATA_PATH/wallet/payments -type f | wc -l) - # if (( $(echo "$payment_files > $expected_payment_files" | bc -l) )); then - # echo "Got too many payment files leftover: $payment_files" - # exit 1 - # fi - # env: - # CLIENT_DATA_PATH: /home/runner/.local/share/safe/client - # timeout-minutes: 10 + # - name: Ensure no leftover cash_notes and payment files + # run: | + # expected_cash_notes_files="1" + # expected_payment_files="0" + # pwd + # ls $CLIENT_DATA_PATH/ -l + # ls $CLIENT_DATA_PATH/wallet -l + # ls $CLIENT_DATA_PATH/wallet/cash_notes -l + # cash_note_files=$(find $CLIENT_DATA_PATH/wallet/cash_notes -type f | wc -l) + # if (( $(echo "$cash_note_files > $expected_cash_notes_files" | bc -l) )); then + # echo "Got too many cash_note files leftover: $cash_note_files when we expected $expected_cash_notes_files" + # exit 1 + # fi + # ls $CLIENT_DATA_PATH/wallet/payments -l + # payment_files=$(find $CLIENT_DATA_PATH/wallet/payments -type f | wc -l) + # if (( $(echo "$payment_files > $expected_payment_files" | bc -l) )); then + # echo "Got too many payment files leftover: $payment_files" + # exit 1 + # fi + # timeout-minutes: 10 # - name: Wait for certain period # run: sleep 300 # timeout-minutes: 6 - # # Start a different client to avoid local wallet slow down with more payments handled. - # - name: Start a different client - # run: | - # pwd - # mv $CLIENT_DATA_PATH $SAFE_DATA_PATH/client_first - # ls -l $SAFE_DATA_PATH - # ls -l $SAFE_DATA_PATH/client_first - # mkdir $SAFE_DATA_PATH/client - # ls -l $SAFE_DATA_PATH - # mv $SAFE_DATA_PATH/client_first/logs $CLIENT_DATA_PATH/logs - # ls -l $CLIENT_DATA_PATH - # ./target/release/safe --log-output-dest=data-dir wallet create --no-password - # ./target/release/faucet --log-output-dest=data-dir send 100000000 $(./target/release/safe --log-output-dest=data-dir wallet address | tail -n 1) | tail -n 1 > transfer_hex - # ./target/release/safe --log-output-dest=data-dir wallet receive --file transfer_hex - # env: - # SN_LOG: "all" - # SAFE_DATA_PATH: /home/runner/.local/share/safe - # CLIENT_DATA_PATH: /home/runner/.local/share/safe/client - # timeout-minutes: 25 + # # Start a different client to avoid local wallet slow down with more payments handled. + # - name: Start a different client + # run: | + # pwd + # mv $CLIENT_DATA_PATH $SAFE_DATA_PATH/client_first + # ls -l $SAFE_DATA_PATH + # ls -l $SAFE_DATA_PATH/client_first + # mkdir $SAFE_DATA_PATH/client + # ls -l $SAFE_DATA_PATH + # mv $SAFE_DATA_PATH/client_first/logs $CLIENT_DATA_PATH/logs + # ls -l $CLIENT_DATA_PATH + # ./target/release/safe --log-output-dest=data-dir wallet create --no-password + # ./target/release/faucet --log-output-dest=data-dir send 100000000 $(./target/release/safe --log-output-dest=data-dir wallet address | tail -n 1) | tail -n 1 > transfer_hex + # ./target/release/safe --log-output-dest=data-dir wallet receive --file transfer_hex + # env: + # SN_LOG: "all" + # SAFE_DATA_PATH: /home/runner/.local/share/safe + # timeout-minutes: 25 # - name: Use second client to upload third file # run: ./target/release/safe --log-output-dest=data-dir files upload "./test_data_3.tar.gz" --retry-strategy quick @@ -1294,29 +1362,27 @@ jobs: # SN_LOG: "all" # timeout-minutes: 10 - # - name: Ensure no leftover cash_notes and payment files - # run: | - # expected_cash_notes_files="1" - # expected_payment_files="0" - # pwd - # ls $CLIENT_DATA_PATH/ -l - # ls $CLIENT_DATA_PATH/wallet -l - # ls $CLIENT_DATA_PATH/wallet/cash_notes -l - # cash_note_files=$(ls $CLIENT_DATA_PATH/wallet/cash_notes | wc -l) - # echo "Find $cash_note_files cash_note files" - # if [ $expected_cash_notes_files -lt $cash_note_files ]; then - # echo "Got too many cash_note files leftover: $cash_note_files" - # exit 1 - # fi - # ls $CLIENT_DATA_PATH/wallet/payments -l - # payment_files=$(ls $CLIENT_DATA_PATH/wallet/payments | wc -l) - # if [ $expected_payment_files -lt $payment_files ]; then - # echo "Got too many payment files leftover: $payment_files" - # exit 1 - # fi - # env: - # CLIENT_DATA_PATH: /home/runner/.local/share/safe/client - # timeout-minutes: 10 + # - name: Ensure no leftover cash_notes and payment files + # run: | + # expected_cash_notes_files="1" + # expected_payment_files="0" + # pwd + # ls $CLIENT_DATA_PATH/ -l + # ls $CLIENT_DATA_PATH/wallet -l + # ls $CLIENT_DATA_PATH/wallet/cash_notes -l + # cash_note_files=$(ls $CLIENT_DATA_PATH/wallet/cash_notes | wc -l) + # echo "Find $cash_note_files cash_note files" + # if [ $expected_cash_notes_files -lt $cash_note_files ]; then + # echo "Got too many cash_note files leftover: $cash_note_files" + # exit 1 + # fi + # ls $CLIENT_DATA_PATH/wallet/payments -l + # payment_files=$(ls $CLIENT_DATA_PATH/wallet/payments | wc -l) + # if [ $expected_payment_files -lt $payment_files ]; then + # echo "Got too many payment files leftover: $payment_files" + # exit 1 + # fi + # timeout-minutes: 10 # - name: Stop the local network and upload logs # if: always() diff --git a/sn_networking/src/bootstrap.rs b/sn_networking/src/bootstrap.rs index f8b7cf1e59..ec6c019a88 100644 --- a/sn_networking/src/bootstrap.rs +++ b/sn_networking/src/bootstrap.rs @@ -7,45 +7,19 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{driver::PendingGetClosestType, SwarmDriver}; -use rand::{rngs::OsRng, Rng}; use tokio::time::Duration; -use crate::target_arch::{interval, Instant, Interval}; +use crate::target_arch::Instant; /// The default interval at which NetworkDiscovery is triggered. The interval is increased as more peers are added to the /// routing table. -pub(crate) const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(10); - -/// Every BOOTSTRAP_CONNECTED_PEERS_STEP connected peer, we step up the BOOTSTRAP_INTERVAL to slow down bootstrapping -/// process -const BOOTSTRAP_CONNECTED_PEERS_STEP: u32 = 5; - -/// If the previously added peer has been before LAST_PEER_ADDED_TIME_LIMIT, then we should slowdown the bootstrapping -/// process. This is to make sure we don't flood the network with `FindNode` msgs. -const LAST_PEER_ADDED_TIME_LIMIT: Duration = Duration::from_secs(180); - -/// A minimum interval to prevent bootstrap got triggered too often -const LAST_BOOTSTRAP_TRIGGERED_TIME_LIMIT: Duration = Duration::from_secs(30); - -/// The bootstrap interval to use if we haven't added any new peers in a while. -const NO_PEER_ADDED_SLOWDOWN_INTERVAL_MAX_S: u64 = 600; +pub(crate) const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(15); impl SwarmDriver { /// This functions triggers network discovery based on when the last peer was added to the RT and the number of - /// peers in RT. The function also returns a new bootstrap interval that is proportional to the number of - /// peers in RT, so more peers in RT, the longer the interval. - pub(crate) async fn run_bootstrap_continuously( - &mut self, - current_bootstrap_interval: Duration, - ) -> Option { - let (should_bootstrap, new_interval) = self - .bootstrap - .should_we_bootstrap(self.peers_in_rt as u32, current_bootstrap_interval) - .await; - if should_bootstrap { - self.trigger_network_discovery(); - } - new_interval + /// peers in RT. + pub(crate) fn run_bootstrap_continuously(&mut self) { + self.trigger_network_discovery(); } pub(crate) fn trigger_network_discovery(&mut self) { @@ -61,27 +35,27 @@ impl SwarmDriver { .get_closest_peers(addr.as_bytes()); let _ = self.pending_get_closest_peers.insert( query_id, - (PendingGetClosestType::NetworkDiscovery, Default::default()), + ( + addr, + PendingGetClosestType::NetworkDiscovery, + Default::default(), + ), ); } self.bootstrap.initiated(); - debug!("Trigger network discovery took {:?}", now.elapsed()); + info!("Trigger network discovery took {:?}", now.elapsed()); } } /// Tracks and helps with the continuous kad::bootstrapping process pub(crate) struct ContinuousBootstrap { - initial_bootstrap_done: bool, - last_peer_added_instant: Instant, last_bootstrap_triggered: Option, } impl ContinuousBootstrap { pub(crate) fn new() -> Self { Self { - initial_bootstrap_done: false, - last_peer_added_instant: Instant::now(), last_bootstrap_triggered: None, } } @@ -90,76 +64,4 @@ impl ContinuousBootstrap { pub(crate) fn initiated(&mut self) { self.last_bootstrap_triggered = Some(Instant::now()); } - - /// Notify about a newly added peer to the RT. This will help with slowing down the bootstrap process. - /// Returns `true` if we have to perform the initial bootstrapping. - pub(crate) fn notify_new_peer(&mut self) -> bool { - self.last_peer_added_instant = Instant::now(); - // true to kick off the initial bootstrapping. `run_bootstrap_continuously` might kick of so soon that we might - // not have a single peer in the RT and we'd not perform any bootstrapping for a while. - if !self.initial_bootstrap_done { - self.initial_bootstrap_done = true; - true - } else { - false - } - } - - /// Returns `true` if we should carry out the Kademlia Bootstrap process immediately. - /// Also optionally returns the new interval to re-bootstrap. - pub(crate) async fn should_we_bootstrap( - &self, - peers_in_rt: u32, - current_interval: Duration, - ) -> (bool, Option) { - let is_ongoing = if let Some(last_bootstrap_triggered) = self.last_bootstrap_triggered { - last_bootstrap_triggered.elapsed() < LAST_BOOTSTRAP_TRIGGERED_TIME_LIMIT - } else { - false - }; - let should_bootstrap = !is_ongoing && peers_in_rt >= 1; - - // if it has been a while (LAST_PEER_ADDED_TIME_LIMIT) since we have added a new peer to our RT, then, slowdown - // the bootstrapping process. - // Don't slow down if we haven't even added one peer to our RT. - if self.last_peer_added_instant.elapsed() > LAST_PEER_ADDED_TIME_LIMIT && peers_in_rt != 0 { - // To avoid a heart beat like cpu usage due to the 1K candidates generation, - // randomize the interval within certain range - let no_peer_added_slowdown_interval: u64 = OsRng.gen_range( - NO_PEER_ADDED_SLOWDOWN_INTERVAL_MAX_S / 2..NO_PEER_ADDED_SLOWDOWN_INTERVAL_MAX_S, - ); - let no_peer_added_slowdown_interval_duration = - Duration::from_secs(no_peer_added_slowdown_interval); - info!( - "It has been {LAST_PEER_ADDED_TIME_LIMIT:?} since we last added a peer to RT. Slowing down the continuous bootstrapping process. Old interval: {current_interval:?}, New interval: {no_peer_added_slowdown_interval_duration:?}" - ); - - // `Interval` ticks immediately for Tokio, but not for `wasmtimer`, which is used for wasm32. - #[cfg_attr(target_arch = "wasm32", allow(unused_mut))] - let mut new_interval = interval(no_peer_added_slowdown_interval_duration); - #[cfg(not(target_arch = "wasm32"))] - new_interval.tick().await; - - return (should_bootstrap, Some(new_interval)); - } - - // increment bootstrap_interval in steps of BOOTSTRAP_INTERVAL every BOOTSTRAP_CONNECTED_PEERS_STEP - let step = peers_in_rt / BOOTSTRAP_CONNECTED_PEERS_STEP; - let step = std::cmp::max(1, step); - let new_interval = BOOTSTRAP_INTERVAL * step; - let new_interval = if new_interval > current_interval { - info!("More peers have been added to our RT!. Slowing down the continuous bootstrapping process. Old interval: {current_interval:?}, New interval: {new_interval:?}"); - - // `Interval` ticks immediately for Tokio, but not for `wasmtimer`, which is used for wasm32. - #[cfg_attr(target_arch = "wasm32", allow(unused_mut))] - let mut interval = interval(new_interval); - #[cfg(not(target_arch = "wasm32"))] - interval.tick().await; - - Some(interval) - } else { - None - }; - (should_bootstrap, new_interval) - } } diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index b0eda19190..5ec9ebd827 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -7,24 +7,25 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{ + close_group_majority, driver::{PendingGetClosestType, SwarmDriver}, error::{NetworkError, Result}, event::TerminateNodeReason, log_markers::Marker, - multiaddr_pop_p2p, GetRecordCfg, GetRecordError, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE, - REPLICATION_PEERS_COUNT, + multiaddr_pop_p2p, sort_peers_by_address_and_limit, GetRecordCfg, GetRecordError, MsgResponder, + NetworkEvent, CLOSE_GROUP_SIZE, }; use libp2p::{ kad::{ store::{Error as StoreError, RecordStore}, - Quorum, Record, RecordKey, + KBucketDistance, Quorum, Record, RecordKey, }, Multiaddr, PeerId, }; use sn_evm::{AttoTokens, PaymentQuote, QuotingMetrics}; use sn_protocol::{ messages::{Cmd, Request, Response}, - storage::{RecordHeader, RecordKind, RecordType}, + storage::{get_type_from_record, RecordType}, NetworkAddress, PrettyPrintRecordKey, }; use std::{ @@ -33,7 +34,6 @@ use std::{ time::Duration, }; use tokio::sync::oneshot; -use xor_name::XorName; use crate::target_arch::Instant; @@ -56,6 +56,15 @@ pub enum NodeIssue { /// Commands to send to the Swarm pub enum LocalSwarmCmd { + // Returns all the peers from all the k-buckets from the local Routing Table. + // This includes our PeerId as well. + GetAllLocalPeersExcludingSelf { + sender: oneshot::Sender>, + }, + /// Return the current GetRange as determined by the SwarmDriver + GetCurrentRange { + sender: oneshot::Sender, + }, /// Get a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that /// bucket. GetKBuckets { @@ -67,8 +76,8 @@ pub enum LocalSwarmCmd { sender: oneshot::Sender>, }, // Get closest peers from the local RoutingTable - GetCloseGroupLocalPeers { - key: NetworkAddress, + GetCloseRangeLocalPeers { + address: NetworkAddress, sender: oneshot::Sender>, }, GetSwarmLocalState(oneshot::Sender), @@ -213,15 +222,11 @@ impl Debug for LocalSwarmCmd { PrettyPrintRecordKey::from(key) ) } - LocalSwarmCmd::GetClosestKLocalPeers { .. } => { write!(f, "LocalSwarmCmd::GetClosestKLocalPeers") } - LocalSwarmCmd::GetCloseGroupLocalPeers { key, .. } => { - write!( - f, - "LocalSwarmCmd::GetCloseGroupLocalPeers {{ key: {key:?} }}" - ) + LocalSwarmCmd::GetCloseRangeLocalPeers { address: key, .. } => { + write!(f, "SwarmCmd::GetCloseGroupLocalPeers {{ key: {key:?} }}") } LocalSwarmCmd::GetLocalStoreCost { .. } => { write!(f, "LocalSwarmCmd::GetLocalStoreCost") @@ -242,6 +247,12 @@ impl Debug for LocalSwarmCmd { LocalSwarmCmd::GetKBuckets { .. } => { write!(f, "LocalSwarmCmd::GetKBuckets") } + LocalSwarmCmd::GetCurrentRange { .. } => { + write!(f, "SwarmCmd::GetCurrentRange") + } + LocalSwarmCmd::GetAllLocalPeersExcludingSelf { .. } => { + write!(f, "SwarmCmd::GetAllLocalPeers") + } LocalSwarmCmd::GetSwarmLocalState { .. } => { write!(f, "LocalSwarmCmd::GetSwarmLocalState") } @@ -472,6 +483,7 @@ impl SwarmDriver { let _ = self.pending_get_closest_peers.insert( query_id, ( + key, PendingGetClosestType::FunctionCall(sender), Default::default(), ), @@ -541,6 +553,7 @@ impl SwarmDriver { Ok(()) } + pub(crate) fn handle_local_cmd(&mut self, cmd: LocalSwarmCmd) -> Result<(), NetworkError> { let start = Instant::now(); let mut cmd_string; @@ -624,28 +637,7 @@ impl SwarmDriver { let key = record.key.clone(); let record_key = PrettyPrintRecordKey::from(&key); - let record_type = match RecordHeader::from_record(&record) { - Ok(record_header) => { - match record_header.kind { - RecordKind::Chunk => RecordType::Chunk, - RecordKind::Scratchpad => RecordType::Scratchpad, - RecordKind::Spend | RecordKind::Register => { - let content_hash = XorName::from_content(&record.value); - RecordType::NonChunk(content_hash) - } - RecordKind::ChunkWithPayment - | RecordKind::RegisterWithPayment - | RecordKind::ScratchpadWithPayment => { - error!("Record {record_key:?} with payment shall not be stored locally."); - return Err(NetworkError::InCorrectRecordHeader); - } - } - } - Err(err) => { - error!("For record {record_key:?}, failed to parse record_header {err:?}"); - return Err(NetworkError::InCorrectRecordHeader); - } - }; + let record_type = get_type_from_record(&record)?; let result = self .swarm @@ -694,16 +686,8 @@ impl SwarmDriver { // The record_store will prune far records and setup a `distance range`, // once reached the `max_records` cap. - if let Some(distance) = self - .swarm - .behaviour_mut() - .kademlia - .store_mut() - .get_farthest_replication_distance_bucket() - { - self.replication_fetcher - .set_replication_distance_range(distance); - } + self.replication_fetcher + .set_replication_distance_range(self.get_request_range()); if let Err(err) = result { error!("Can't store verified record {record_key:?} locally: {err:?}"); @@ -760,6 +744,10 @@ impl SwarmDriver { .record_addresses(); let _ = sender.send(addresses); } + LocalSwarmCmd::GetCurrentRange { sender } => { + cmd_string = "GetCurrentRange"; + let _ = sender.send(self.get_request_range()); + } LocalSwarmCmd::GetKBuckets { sender } => { cmd_string = "GetKBuckets"; let mut ilog2_kbuckets = BTreeMap::new(); @@ -778,9 +766,13 @@ impl SwarmDriver { } let _ = sender.send(ilog2_kbuckets); } - LocalSwarmCmd::GetCloseGroupLocalPeers { key, sender } => { - cmd_string = "GetCloseGroupLocalPeers"; - let key = key.as_kbucket_key(); + LocalSwarmCmd::GetAllLocalPeersExcludingSelf { sender } => { + cmd_string = "GetAllLocalPeersExcludingSelf"; + let _ = sender.send(self.get_all_local_peers_excluding_self()); + } + LocalSwarmCmd::GetCloseRangeLocalPeers { address, sender } => { + cmd_string = "GetCloseRangeLocalPeers"; + let key = address.as_kbucket_key(); // calls `kbuckets.closest_keys(key)` internally, which orders the peers by // increasing distance // Note it will return all peers, heance a chop down is required. @@ -790,7 +782,6 @@ impl SwarmDriver { .kademlia .get_closest_local_peers(&key) .map(|peer| peer.into_preimage()) - .take(CLOSE_GROUP_SIZE) .collect(); let _ = sender.send(closest_peers); @@ -981,24 +972,72 @@ impl SwarmDriver { let _ = self.quotes_history.insert(peer_id, quote); } - fn try_interval_replication(&mut self) -> Result<()> { - // get closest peers from buckets, sorted by increasing distance to us - let our_peer_id = self.self_peer_id.into(); - let closest_k_peers = self + /// From all local peers, returns any within (and just exceeding) current get_range for a given key + pub(crate) fn get_filtered_peers_exceeding_range( + &mut self, + target_address: &NetworkAddress, + ) -> Vec { + let acceptable_distance_range = self.get_request_range(); + let target_key = target_address.as_kbucket_key(); + + let peers = self .swarm .behaviour_mut() .kademlia - .get_closest_local_peers(&our_peer_id) - // Map KBucketKey to PeerId. - .map(|key| key.into_preimage()); - - // Only grab the closest nodes within the REPLICATE_RANGE - let mut replicate_targets = closest_k_peers - .into_iter() - // add some leeway to allow for divergent knowledge - .take(REPLICATION_PEERS_COUNT) + .get_closest_local_peers(&target_key) + .filter_map(|key| { + // here we compare _bucket_, not the exact distance. + // We want to include peers that are just outside the range + // Such that we can and will exceed the range in a search eventually + if acceptable_distance_range.ilog2() < target_key.distance(&key).ilog2() { + return None; + } + + // Map KBucketKey to PeerId. + let peer_id = key.into_preimage(); + Some(peer_id) + }) .collect::>(); + peers + } + + /// From all local peers, returns any within current get_range for a given key + /// Excludes self + pub(crate) fn get_filtered_peers_exceeding_range_or_closest_nodes( + &mut self, + target_address: &NetworkAddress, + ) -> Vec { + let filtered_peers = self.get_filtered_peers_exceeding_range(target_address); + let closest_node_buffer_zone = CLOSE_GROUP_SIZE + close_group_majority(); + if filtered_peers.len() >= closest_node_buffer_zone { + filtered_peers + } else { + warn!("Insufficient peers within replication range of {target_address:?}. Falling back to use {closest_node_buffer_zone:?} closest nodes"); + let all_peers = self.get_all_local_peers_excluding_self(); + match sort_peers_by_address_and_limit( + &all_peers, + target_address, + closest_node_buffer_zone, + ) { + Ok(peers) => peers.iter().map(|p| **p).collect(), + Err(err) => { + error!("sorting peers close to {target_address:?} failed, sort error: {err:?}"); + warn!( + "Using all peers within range even though it's less than CLOSE_GROUP_SIZE." + ); + filtered_peers + } + } + } + } + + fn try_interval_replication(&mut self) -> Result<()> { + let our_address = NetworkAddress::from_peer(self.self_peer_id); + + let mut replicate_targets = + self.get_filtered_peers_exceeding_range_or_closest_nodes(&our_address); + let now = Instant::now(); self.replication_targets .retain(|_peer_id, timestamp| *timestamp > now); diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index ec716cb4df..f432d231fc 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -20,6 +20,7 @@ use crate::{ record_store_api::UnifiedRecordStore, relay_manager::RelayManager, replication_fetcher::ReplicationFetcher, + sort_peers_by_distance_to, target_arch::{interval, spawn, Instant}, GetRecordError, Network, CLOSE_GROUP_SIZE, }; @@ -32,7 +33,6 @@ use futures::future::Either; use futures::StreamExt; #[cfg(feature = "local")] use libp2p::mdns; -use libp2p::Transport as _; use libp2p::{core::muxing::StreamMuxerBox, relay}; use libp2p::{ identity::Keypair, @@ -45,6 +45,7 @@ use libp2p::{ }, Multiaddr, PeerId, }; +use libp2p::{kad::KBucketDistance, Transport as _}; #[cfg(feature = "open-metrics")] use prometheus_client::metrics::info::Info; use sn_evm::PaymentQuote; @@ -59,10 +60,9 @@ use sn_protocol::{ }; use sn_registers::SignedRegister; use std::{ - collections::{btree_map::Entry, BTreeMap, HashMap, HashSet}, + collections::{btree_map::Entry, BTreeMap, HashMap, HashSet, VecDeque}, fmt::Debug, net::SocketAddr, - num::NonZeroUsize, path::PathBuf, }; use tokio::sync::{mpsc, oneshot}; @@ -77,6 +77,9 @@ pub(crate) const CLOSET_RECORD_CHECK_INTERVAL: Duration = Duration::from_secs(15 /// Interval over which we query relay manager to check if we can make any more reservations. pub(crate) const RELAY_MANAGER_RESERVATION_INTERVAL: Duration = Duration::from_secs(30); +// Number of range distances to keep in the circular buffer +pub const GET_RANGE_STORAGE_LIMIT: usize = 100; + const KAD_STREAM_PROTOCOL_ID: StreamProtocol = StreamProtocol::new("/autonomi/kad/1.0.0"); /// The ways in which the Get Closest queries are used. @@ -87,7 +90,9 @@ pub(crate) enum PendingGetClosestType { /// These are queries made by a function at the upper layers and contains a channel to send the result back. FunctionCall(oneshot::Sender>), } -type PendingGetClosest = HashMap)>; + +/// Maps a query to the address, the type of query and the peers that are being queried. +type PendingGetClosest = HashMap)>; /// Using XorName to differentiate different record content under the same key. type GetRecordResultMap = HashMap)>; @@ -123,13 +128,6 @@ const NETWORKING_CHANNEL_SIZE: usize = 10_000; /// Time before a Kad query times out if no response is received const KAD_QUERY_TIMEOUT_S: Duration = Duration::from_secs(10); -// Init during compilation, instead of runtime error that should never happen -// Option::expect will be stabilised as const in the future (https://github.com/rust-lang/rust/issues/67441) -const REPLICATION_FACTOR: NonZeroUsize = match NonZeroUsize::new(CLOSE_GROUP_SIZE) { - Some(v) => v, - None => panic!("CLOSE_GROUP_SIZE should not be zero"), -}; - /// The various settings to apply to when fetching a record from network #[derive(Clone)] pub struct GetRecordCfg { @@ -349,8 +347,6 @@ impl NetworkBuilder { .set_publication_interval(None) // 1mb packet size .set_max_packet_size(MAX_PACKET_SIZE) - // How many nodes _should_ store data. - .set_replication_factor(REPLICATION_FACTOR) .set_query_timeout(KAD_QUERY_TIMEOUT_S) // Require iterative queries to use disjoint paths for increased resiliency in the presence of potentially adversarial nodes. .disjoint_query_paths(true) @@ -429,9 +425,7 @@ impl NetworkBuilder { .set_kbucket_inserts(libp2p::kad::BucketInserts::Manual) .set_max_packet_size(MAX_PACKET_SIZE) // Require iterative queries to use disjoint paths for increased resiliency in the presence of potentially adversarial nodes. - .disjoint_query_paths(true) - // How many nodes _should_ store data. - .set_replication_factor(REPLICATION_FACTOR); + .disjoint_query_paths(true); let (network, net_event_recv, driver) = self.build( kad_cfg, @@ -697,6 +691,8 @@ impl NetworkBuilder { bad_nodes: Default::default(), quotes_history: Default::default(), replication_targets: Default::default(), + range_distances: VecDeque::with_capacity(GET_RANGE_STORAGE_LIMIT), + first_contact_made: false, }; let network = Network::new( @@ -732,7 +728,7 @@ pub struct SwarmDriver { pub(crate) local_cmd_sender: mpsc::Sender, local_cmd_receiver: mpsc::Receiver, network_cmd_receiver: mpsc::Receiver, - event_sender: mpsc::Sender, // Use `self.send_event()` to send a NetworkEvent. + pub(crate) event_sender: mpsc::Sender, // Use `self.send_event()` to send a NetworkEvent. /// Trackers for underlying behaviour related events pub(crate) pending_get_closest_peers: PendingGetClosest, @@ -755,6 +751,13 @@ pub struct SwarmDriver { pub(crate) bad_nodes: BadNodes, pub(crate) quotes_history: BTreeMap, pub(crate) replication_targets: BTreeMap, + + // The recent range_distances calculated by the node + // Each update is generated when there is a routing table change + // We use the largest of these X_STORAGE_LIMIT values as our X distance. + pub(crate) range_distances: VecDeque, + // have we found out initial peer + pub(crate) first_contact_made: bool, } impl SwarmDriver { @@ -805,28 +808,24 @@ impl SwarmDriver { // logging for handling events happens inside handle_swarm_events // otherwise we're rewriting match statements etc around this anwyay if let Err(err) = self.handle_swarm_events(swarm_event) { - warn!("Error while handling swarm event: {err}"); + warn!("Issue while handling swarm event: {err}"); } }, // thereafter we can check our intervals // runs every bootstrap_interval time _ = bootstrap_interval.tick() => { - if let Some(new_interval) = self.run_bootstrap_continuously(bootstrap_interval.period()).await { - bootstrap_interval = new_interval; - } + self.run_bootstrap_continuously(); } _ = set_farthest_record_interval.tick() => { if !self.is_client { - let closest_k_peers = self.get_closest_k_value_local_peers(); - - if let Some(distance) = self.get_responsbile_range_estimate(&closest_k_peers) { - info!("Set responsible range to {distance}"); - // set any new distance to farthest record in the store - self.swarm.behaviour_mut().kademlia.store_mut().set_distance_range(distance); - // the distance range within the replication_fetcher shall be in sync as well - self.replication_fetcher.set_replication_distance_range(distance); - } + let get_range = self.get_request_range(); + self.swarm.behaviour_mut().kademlia.store_mut().set_distance_range(get_range); + + // the distance range within the replication_fetcher shall be in sync as well + self.replication_fetcher.set_replication_distance_range(get_range); + + } } _ = relay_manager_reservation_interval.tick() => self.relay_manager.try_connecting_to_relay(&mut self.swarm, &self.bad_nodes), @@ -838,32 +837,90 @@ impl SwarmDriver { // ---------- Crate helpers ------------------- // -------------------------------------------- - /// Uses the closest k peers to estimate the farthest address as - /// `K_VALUE / 2`th peer's bucket. - fn get_responsbile_range_estimate( + /// Defines a new X distance range to be used for GETs and data replication + /// + /// Enumerates buckets and generates a random distance in the first bucket + /// that has at least `MIN_PEERS_IN_BUCKET` peers. + /// + pub(crate) fn set_request_range( &mut self, - // Sorted list of closest k peers to our peer id. - closest_k_peers: &[PeerId], - ) -> Option { - // if we don't have enough peers we don't set the distance range yet. - let mut farthest_distance = None; - - if closest_k_peers.is_empty() { - return farthest_distance; + queried_address: NetworkAddress, + network_discovery_peers: &[PeerId], + ) { + info!( + "Adding a GetRange to our stash deriving from {:?} peers", + network_discovery_peers.len() + ); + + let sorted_distances = sort_peers_by_distance_to(network_discovery_peers, queried_address); + + let mapped: Vec<_> = sorted_distances.iter().map(|d| d.ilog2()).collect(); + info!("Sorted distances: {:?}", mapped); + + let farthest_peer_to_check = self + .get_all_local_peers_excluding_self() + .len() + .checked_div(3 * CLOSE_GROUP_SIZE) + .unwrap_or(1); + + info!("Farthest peer we'll check: {:?}", farthest_peer_to_check); + + let yardstick = if sorted_distances.len() >= farthest_peer_to_check { + sorted_distances.get(farthest_peer_to_check.saturating_sub(1)) + } else { + sorted_distances.last() + }; + if let Some(distance) = yardstick { + if self.range_distances.len() >= GET_RANGE_STORAGE_LIMIT { + if let Some(distance) = self.range_distances.pop_front() { + trace!("Removed distance range: {:?}", distance.ilog2()); + } + } + + info!("Adding new distance range: {:?}", distance.ilog2()); + + self.range_distances.push_back(*distance); } - let our_address = NetworkAddress::from_peer(self.self_peer_id); + info!( + "Distance between peers in set_request_range call: {:?}", + yardstick + ); + } + + /// Returns the KBucketDistance we are currently using as our X value + /// for range based search. + pub(crate) fn get_request_range(&self) -> KBucketDistance { + let mut sorted_distances = self.range_distances.iter().collect::>(); - // get `K_VALUE / 2`th peer's address distance - // This is a rough estimate of the farthest address we might be responsible for. - // We want this to be higher than actually necessary, so we retain more data - // and can be sure to pass bad node checks - let target_index = std::cmp::min(K_VALUE.get() / 2, closest_k_peers.len()) - 1; + sorted_distances.sort_unstable(); - let address = NetworkAddress::from_peer(closest_k_peers[target_index]); - farthest_distance = our_address.distance(&address).ilog2(); + let median_index = sorted_distances.len() / 2; - farthest_distance + let default = KBucketDistance::default(); + let median = sorted_distances.get(median_index).cloned(); + + if let Some(dist) = median { + *dist + } else { + default + } + } + + /// get all the peers from our local RoutingTable. Excluding self + pub(crate) fn get_all_local_peers_excluding_self(&mut self) -> Vec { + let our_peer_id = self.self_peer_id; + let mut all_peers: Vec = vec![]; + for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() { + for entry in kbucket.iter() { + let id = entry.node.key.into_preimage(); + + if id != our_peer_id { + all_peers.push(id); + } + } + } + all_peers } /// Pushes NetworkSwarmCmd off thread so as to be non-blocking diff --git a/sn_networking/src/error.rs b/sn_networking/src/error.rs index 6534c84017..99bf1fbe92 100644 --- a/sn_networking/src/error.rs +++ b/sn_networking/src/error.rs @@ -30,10 +30,11 @@ pub(super) type Result = std::result::Result; #[derive(Error, Clone)] pub enum GetRecordError { #[error("Get Record completed with non enough copies")] - NotEnoughCopies { + NotEnoughCopiesInRange { record: Record, expected: usize, got: usize, + range: u32, }, #[error("Record not found in the network")] @@ -55,16 +56,18 @@ pub enum GetRecordError { impl Debug for GetRecordError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::NotEnoughCopies { + Self::NotEnoughCopiesInRange { record, expected, got, + range, } => { let pretty_key = PrettyPrintRecordKey::from(&record.key); - f.debug_struct("NotEnoughCopies") + f.debug_struct("NotEnoughCopiesInRange") .field("record_key", &pretty_key) .field("expected", &expected) .field("got", &got) + .field("range", &range) .finish() } Self::RecordNotFound => write!(f, "RecordNotFound"), @@ -122,9 +125,6 @@ pub enum NetworkError { #[error("The RecordKind obtained from the Record did not match with the expected kind: {0}")] RecordKindMismatch(RecordKind), - #[error("Record header is incorrect")] - InCorrectRecordHeader, - // ---------- Transfer Errors #[error("Failed to get spend: {0}")] FailedToGetSpend(String), @@ -138,7 +138,7 @@ pub enum NetworkError { // ---------- Spend Errors #[error("Spend not found: {0:?}")] NoSpendFoundInsideRecord(SpendAddress), - #[error("Double spend(s) attempt was detected. The signed spends are: {0:?}")] + #[error("Double SpendAttempt was detected. The signed spends are: {0:?}")] DoubleSpendAttempt(Vec), // ---------- Store Error diff --git a/sn_networking/src/event/kad.rs b/sn_networking/src/event/kad.rs index 6551f6e5f0..88a2a7ffca 100644 --- a/sn_networking/src/event/kad.rs +++ b/sn_networking/src/event/kad.rs @@ -7,21 +7,26 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{ - driver::PendingGetClosestType, get_quorum_value, get_raw_signed_spends_from_record, - target_arch::Instant, GetRecordCfg, GetRecordError, NetworkError, Result, SwarmDriver, - CLOSE_GROUP_SIZE, + cmd::NetworkSwarmCmd, driver::PendingGetClosestType, get_quorum_value, GetRecordCfg, + GetRecordError, NetworkError, Result, SwarmDriver, CLOSE_GROUP_SIZE, }; use itertools::Itertools; -use libp2p::kad::{ - self, GetClosestPeersError, InboundRequest, PeerRecord, ProgressStep, QueryId, QueryResult, - QueryStats, Record, K_VALUE, +use libp2p::{ + kad::{ + self, GetClosestPeersError, InboundRequest, KBucketDistance, PeerRecord, ProgressStep, + QueryId, QueryResult, QueryStats, Quorum, Record, K_VALUE, + }, + PeerId, }; use sn_protocol::{ - storage::{try_serialize_record, RecordKind}, - PrettyPrintRecordKey, + messages::{Cmd, Request}, + storage::get_type_from_record, + NetworkAddress, PrettyPrintRecordKey, +}; +use std::{ + collections::{hash_map::Entry, HashSet}, + time::Instant, }; -use sn_transfers::SignedSpend; -use std::collections::{hash_map::Entry, BTreeSet, HashSet}; use tokio::sync::oneshot; use xor_name::XorName; @@ -31,6 +36,9 @@ impl SwarmDriver { let event_string; match kad_event { + // We use this query both to bootstrap and populate our routing table, + // but also to define our GetRange as defined by the largest distance between + // peers in any recent GetClosest call. kad::Event::OutboundQueryProgressed { id, result: QueryResult::GetClosestPeers(Ok(ref closest_peers)), @@ -45,7 +53,7 @@ impl SwarmDriver { ); if let Entry::Occupied(mut entry) = self.pending_get_closest_peers.entry(id) { - let (_, current_closest) = entry.get_mut(); + let (_, _, current_closest) = entry.get_mut(); // TODO: consider order the result and terminate when reach any of the // following criteria: @@ -53,16 +61,19 @@ impl SwarmDriver { // 2, `stats.duration()` is longer than a defined period current_closest.extend(closest_peers.peers.iter().map(|i| i.peer_id)); if current_closest.len() >= usize::from(K_VALUE) || step.last { - let (get_closest_type, current_closest) = entry.remove(); - match get_closest_type { - PendingGetClosestType::NetworkDiscovery => self - .network_discovery - .handle_get_closest_query(current_closest), - PendingGetClosestType::FunctionCall(sender) => { - sender - .send(current_closest) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; - } + let (address, get_closest_type, current_closest) = entry.remove(); + self.network_discovery + .handle_get_closest_query(¤t_closest); + + if let PendingGetClosestType::FunctionCall(sender) = get_closest_type { + sender + .send(current_closest) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + } else { + // do not set this via function calls, as that could potentially + // skew the results in favour of heavily queried (and manipulated) + // areas of the network + self.set_request_range(address, ¤t_closest); } } } else { @@ -81,9 +92,8 @@ impl SwarmDriver { ref step, } => { event_string = "kad_event::get_closest_peers_err"; - error!("GetClosest Query task {id:?} errored with {err:?}, {stats:?} - {step:?}"); - let (get_closest_type, mut current_closest) = + let (address, get_closest_type, mut current_closest) = self.pending_get_closest_peers.remove(&id).ok_or_else(|| { debug!( "Can't locate query task {id:?}, it has likely been completed already." @@ -100,13 +110,23 @@ impl SwarmDriver { match err { GetClosestPeersError::Timeout { ref peers, .. } => { current_closest.extend(peers.iter().map(|i| i.peer_id)); + if current_closest.len() < CLOSE_GROUP_SIZE { + error!( + "GetClosest Query task {id:?} errored, not enough found. {err:?}, {stats:?} - {step:?}" + ); + } } } match get_closest_type { - PendingGetClosestType::NetworkDiscovery => self - .network_discovery - .handle_get_closest_query(current_closest), + PendingGetClosestType::NetworkDiscovery => { + // do not set this via function calls, as that could potentially + // skew the results in favour of heavily queried (and manipulated) + // areas of the network + self.set_request_range(address, ¤t_closest); + self.network_discovery + .handle_get_closest_query(¤t_closest); + } PendingGetClosestType::FunctionCall(sender) => { sender .send(current_closest) @@ -127,7 +147,7 @@ impl SwarmDriver { PrettyPrintRecordKey::from(&peer_record.record.key), peer_record.peer ); - self.accumulate_get_record_found(id, peer_record, stats, step)?; + self.accumulate_get_record_found(id, peer_record)?; } kad::Event::OutboundQueryProgressed { id, @@ -248,12 +268,13 @@ impl SwarmDriver { event_string = "kad_event::RoutingUpdated"; if is_new_peer { self.update_on_peer_addition(peer); + } + if !self.first_contact_made { // This should only happen once - if self.bootstrap.notify_new_peer() { - info!("Performing the first bootstrap"); - self.trigger_network_discovery(); - } + self.first_contact_made = true; + info!("Performing the first bootstrap"); + self.trigger_network_discovery(); } info!("kad_event::RoutingUpdated {:?}: {peer:?}, is_new_peer: {is_new_peer:?} old_peer: {old_peer:?}", self.peers_in_rt); @@ -320,6 +341,7 @@ impl SwarmDriver { // `QueryStats::requests` to be 20 (K-Value) // `QueryStats::success` to be over majority of the requests // `err::NotFound::closest_peers` contains a list of CLOSE_GROUP_SIZE peers + // // 2, targeting an existing entry // there will a sequence of (at least CLOSE_GROUP_SIZE) events of // `kad::Event::OutboundQueryProgressed` to be received @@ -333,26 +355,30 @@ impl SwarmDriver { // where: `cache_candidates`: being the peers supposed to hold the record but not // `ProgressStep::count`: to be `number of received copies plus one` // `ProgressStep::last` to be `true` + // + // /// Accumulates the GetRecord query results - /// If we get enough responses (quorum) for a record with the same content hash: + /// If we get enough responses (ie exceed GetRange) for a record with the same content hash: /// - we return the Record after comparing with the target record. This might return RecordDoesNotMatch if the /// check fails. /// - if multiple content hashes are found, we return a SplitRecord Error /// And then we stop the kad query as we are done here. + /// We do not need to wait for GetRange to be exceeded here and should return early. fn accumulate_get_record_found( &mut self, query_id: QueryId, peer_record: PeerRecord, - _stats: QueryStats, - step: ProgressStep, ) -> Result<()> { + let expected_get_range = self.get_request_range(); + let key = peer_record.record.key.clone(); + let peer_id = if let Some(peer_id) = peer_record.peer { peer_id } else { self.self_peer_id }; - let pretty_key = PrettyPrintRecordKey::from(&peer_record.record.key).into_owned(); + let pretty_key = PrettyPrintRecordKey::from(&key).into_owned(); if let Entry::Occupied(mut entry) = self.pending_get_record.entry(query_id) { let (_key, _senders, result_map, cfg) = entry.get_mut(); @@ -367,92 +393,97 @@ impl SwarmDriver { // Insert the record and the peer into the result_map. let record_content_hash = XorName::from_content(&peer_record.record.value); - let responded_peers = + + let peer_list = if let Entry::Occupied(mut entry) = result_map.entry(record_content_hash) { let (_, peer_list) = entry.get_mut(); + let _ = peer_list.insert(peer_id); - peer_list.len() + peer_list.clone() } else { let mut peer_list = HashSet::new(); let _ = peer_list.insert(peer_id); - result_map.insert(record_content_hash, (peer_record.record.clone(), peer_list)); - 1 + result_map.insert( + record_content_hash, + (peer_record.record.clone(), peer_list.clone()), + ); + + peer_list }; + let responded_peers = peer_list.len(); + let expected_answers = get_quorum_value(&cfg.get_quorum); + trace!("Expecting {expected_answers:?} answers to exceed {expected_get_range:?} for record {pretty_key:?} task {query_id:?}, received {responded_peers} so far"); + // return error if the entry cannot be found + return Err(NetworkError::ReceivedKademliaEventDropped { + query_id, + event: format!("Accumulate Get Record of {pretty_key:?}"), + }); + } + Ok(()) + } - debug!("Expecting {expected_answers:?} answers for record {pretty_key:?} task {query_id:?}, received {responded_peers} so far"); + /// Checks passed peers from a request and checks they are sufficiently spaced to + /// ensure we have searched enough of the network range as determined by our `get_range` + /// + /// We expect any conflicting records to have been reported prior to this check, + /// so we assume we're returning unique records only. + fn have_we_have_searched_thoroughly_for_quorum( + expected_get_range: KBucketDistance, + searched_peers_list: &HashSet, + data_key_address: &NetworkAddress, + quorum: &Quorum, + ) -> bool { + info!("Assessing search: range: {:?}, address: {data_key_address:?}, quorum required: {quorum:?}, peers_returned_count: {:?}", expected_get_range.ilog2(), searched_peers_list.len()); + let is_sensitive_data = matches!(quorum, Quorum::All); + + let required_quorum = get_quorum_value(quorum); + + let met_quorum = searched_peers_list.len() >= required_quorum; + + // we only enforce range if we have sensitive data...for data spends quorum::all + if met_quorum && !is_sensitive_data { + return true; + } - if responded_peers >= expected_answers { - if !cfg.expected_holders.is_empty() { - debug!("For record {pretty_key:?} task {query_id:?}, fetch completed with non-responded expected holders {:?}", cfg.expected_holders); - } - let cfg = cfg.clone(); + // get the farthest distance between peers in the response + let mut max_distance_to_data_from_responded_nodes = KBucketDistance::default(); - // Remove the query task and consume the variables. - let (_key, senders, result_map, _) = entry.remove(); + // iterate over peers and see if the distance to the data is greater than the get_range + for peer_id in searched_peers_list.iter() { + let peer_address = NetworkAddress::from_peer(*peer_id); + let distance_to_data = peer_address.distance(data_key_address); + if max_distance_to_data_from_responded_nodes < distance_to_data { + max_distance_to_data_from_responded_nodes = distance_to_data; + } + } - if result_map.len() == 1 { - Self::send_record_after_checking_target(senders, peer_record.record, &cfg)?; - } else { - debug!("For record {pretty_key:?} task {query_id:?}, fetch completed with split record"); - let mut accumulated_spends = BTreeSet::new(); - for (record, _) in result_map.values() { - match get_raw_signed_spends_from_record(record) { - Ok(spends) => { - accumulated_spends.extend(spends); - } - Err(_) => { - continue; - } - } - } - if !accumulated_spends.is_empty() { - info!("For record {pretty_key:?} task {query_id:?}, found split record for a spend, accumulated and sending them as a single record"); - let accumulated_spends = - accumulated_spends.into_iter().collect::>(); - - let bytes = try_serialize_record(&accumulated_spends, RecordKind::Spend)?; - - let new_accumulated_record = Record { - key: peer_record.record.key, - value: bytes.to_vec(), - publisher: None, - expires: None, - }; - for sender in senders { - let new_accumulated_record = new_accumulated_record.clone(); + // use ilog2 as simplified distance check + // It allows us to say "we've searched up to and including this bucket" + // as opposed to the concrete distance itself (which statistically seems like we can fall outwith a range + // quite easily with a small number of peers) + let exceeded_request_range = if max_distance_to_data_from_responded_nodes.ilog2() + < expected_get_range.ilog2() + { + let dist = max_distance_to_data_from_responded_nodes.ilog2(); + let expected_dist = expected_get_range.ilog2(); - sender - .send(Ok(new_accumulated_record)) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; - } - } else { - for sender in senders { - let result_map = result_map.clone(); - sender - .send(Err(GetRecordError::SplitRecord { result_map })) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; - } - } - } + warn!("RANGE: {data_key_address:?} Insufficient GetRange searched. {dist:?} {expected_dist:?} {max_distance_to_data_from_responded_nodes:?} is less than expcted GetRange of {expected_get_range:?}"); - // Stop the query; possibly stops more nodes from being queried. - if let Some(mut query) = self.swarm.behaviour_mut().kademlia.query_mut(&query_id) { - query.finish(); - } - } else if usize::from(step.count) >= CLOSE_GROUP_SIZE { - debug!("For record {pretty_key:?} task {query_id:?}, got {:?} with {} versions so far.", - step.count, result_map.len()); - } + false } else { - // return error if the entry cannot be found - return Err(NetworkError::ReceivedKademliaEventDropped { - query_id, - event: format!("Accumulate Get Record of {pretty_key:?}"), - }); + true + }; + + // We assume a finalised query has searched as far as it can in libp2p + + if exceeded_request_range && met_quorum { + warn!("RANGE: {data_key_address:?} Request satisfied as exceeded request range : {exceeded_request_range:?} and Quorum satisfied with {:?} peers exceeding quorum {required_quorum:?}", searched_peers_list.len()); + return true; } - Ok(()) + + false } /// Handles the possible cases when a GetRecord Query completes. @@ -469,16 +500,92 @@ impl SwarmDriver { let (result, log_string) = if let Some((record, from_peers)) = result_map.values().next() { - let result = if num_of_versions == 1 { - Err(GetRecordError::NotEnoughCopies { - record: record.clone(), - expected: get_quorum_value(&cfg.get_quorum), - got: from_peers.len(), - }) - } else { + let data_key_address = NetworkAddress::from_record_key(&record.key); + let expected_get_range = self.get_request_range(); + + let we_have_searched_thoroughly = Self::have_we_have_searched_thoroughly_for_quorum( + expected_get_range, + from_peers, + &data_key_address, + &cfg.get_quorum, + ); + + let pretty_key = PrettyPrintRecordKey::from(&record.key); + info!("RANGE: {pretty_key:?} we_have_searched_far_enough: {we_have_searched_thoroughly:?}"); + + let result = if num_of_versions > 1 { + warn!("RANGE: more than one version found!"); Err(GetRecordError::SplitRecord { result_map: result_map.clone(), }) + } else if we_have_searched_thoroughly { + warn!("RANGE: Get record finished: {pretty_key:?} Enough of the network has responded or it's not sensitive data... and we only have one copy..."); + + Ok(record.clone()) + } else { + // We have not searched enough of the network range. + let result = Err(GetRecordError::NotEnoughCopiesInRange { + record: record.clone(), + expected: get_quorum_value(&cfg.get_quorum), + got: from_peers.len(), + range: expected_get_range.ilog2().unwrap_or(0), + }); + + // This should be a backstop... Quorum::All is the only one that enforces + // a full search of the network range. + if matches!(cfg.get_quorum, Quorum::All) { + warn!("RANGE: {pretty_key:?} Query Finished: Not enough of the network has responded, we need to extend the range and PUT the data. {result:?}"); + + warn!("Reputting data to network {pretty_key:?}..."); + + // let's ensure we have an updated network view + self.trigger_network_discovery(); + + warn!("RANGE: {pretty_key:?} Query Finished: Not enough of the network has responded, we need PUT the data back into nodes in that range."); + + let record_type = get_type_from_record(record)?; + + let replicate_targets: HashSet<_> = self + .get_filtered_peers_exceeding_range_or_closest_nodes(&data_key_address) + .iter() + .cloned() + .collect(); + + if from_peers == &replicate_targets { + warn!("RANGE: {pretty_key:?} We asked everyone we know of in that range already!"); + } + + // set holder to someone that has the data + let holder = NetworkAddress::from_peer( + from_peers + .iter() + .next() + .cloned() + .unwrap_or(self.self_peer_id), + ); + + for peer in replicate_targets { + warn!("Reputting data to {peer:?} for {pretty_key:?} if needed..."); + // Do not send to any peer that has already informed us + if from_peers.contains(&peer) { + continue; + } + + debug!("RANGE: (insufficient, so ) Sending data to unresponded peer: {peer:?} for {pretty_key:?}"); + + // nodes will try/fail to trplicate it from us, but grab from the network thereafter + self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest { + req: Request::Cmd(Cmd::Replicate { + holder: holder.clone(), + keys: vec![(data_key_address.clone(), record_type.clone())], + }), + peer, + sender: None, + }); + } + } + + result }; ( @@ -508,8 +615,6 @@ impl SwarmDriver { .map_err(|_| NetworkError::InternalMsgChannelDropped)?; } } else { - // We manually perform `query.finish()` if we return early from accumulate fn. - // Thus we will still get FinishedWithNoAdditionalRecord. debug!("Can't locate query task {query_id:?} during GetRecord finished. We might have already returned the result to the sender."); } Ok(()) diff --git a/sn_networking/src/event/request_response.rs b/sn_networking/src/event/request_response.rs index 4550772bf4..ca6808ed1b 100644 --- a/sn_networking/src/event/request_response.rs +++ b/sn_networking/src/event/request_response.rs @@ -7,17 +7,21 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{ - cmd::NetworkSwarmCmd, log_markers::Marker, sort_peers_by_address, MsgResponder, NetworkError, - NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE, + cmd::NetworkSwarmCmd, log_markers::Marker, sort_peers_by_address_and_limit, MsgResponder, + NetworkError, NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE, }; -use itertools::Itertools; -use libp2p::request_response::{self, Message}; -use rand::{rngs::OsRng, thread_rng, Rng}; +use libp2p::{ + kad::RecordKey, + request_response::{self, Message}, + PeerId, +}; +use rand::{rngs::OsRng, Rng}; use sn_protocol::{ messages::{CmdResponse, Request, Response}, storage::RecordType, NetworkAddress, }; +use std::collections::HashMap; impl SwarmDriver { /// Forwards `Request` to the upper layers using `Sender`. Sends `Response` to the peers @@ -190,6 +194,9 @@ impl SwarmDriver { sender: NetworkAddress, incoming_keys: Vec<(NetworkAddress, RecordType)>, ) { + let peers = self.get_all_local_peers_excluding_self(); + let our_peer_id = self.self_peer_id; + let holder = if let Some(peer_id) = sender.as_peer_id() { peer_id } else { @@ -202,16 +209,12 @@ impl SwarmDriver { incoming_keys.len() ); - // accept replication requests from the K_VALUE peers away, - // giving us some margin for replication - let closest_k_peers = self.get_closest_k_value_local_peers(); - if !closest_k_peers.contains(&holder) || holder == self.self_peer_id { - debug!("Holder {holder:?} is self or not in replication range."); + // accept replication requests from all peers known peers within our GetRange + if !peers.contains(&holder) || holder == our_peer_id { + trace!("Holder {holder:?} is self or not in replication range."); return; } - let more_than_one_key = incoming_keys.len() > 1; - // On receive a replication_list from a close_group peer, we undertake two tasks: // 1, For those keys that we don't have: // fetch them if close enough to us @@ -224,81 +227,94 @@ impl SwarmDriver { .behaviour_mut() .kademlia .store_mut() - .record_addresses_ref(); - let keys_to_fetch = self - .replication_fetcher - .add_keys(holder, incoming_keys, all_keys); + .record_addresses_ref() + .clone(); + + let keys_to_fetch = + self.replication_fetcher + .add_keys(holder, incoming_keys, &all_keys, &peers); + if keys_to_fetch.is_empty() { debug!("no waiting keys to fetch from the network"); } else { self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch)); } - // Only trigger chunk_proof check based every X% of the time - let mut rng = thread_rng(); - // 5% probability - if more_than_one_key && rng.gen_bool(0.05) { - self.verify_peer_storage(sender.clone()); + let event_sender = self.event_sender.clone(); + let _handle = tokio::spawn(async move { + let keys_to_verify = + Self::select_verification_data_candidates(&peers, &all_keys, &sender); + + if keys_to_verify.is_empty() { + debug!("No valid candidate to be checked against peer {holder:?}"); + } else if let Err(error) = event_sender + .send(NetworkEvent::ChunkProofVerification { + peer_id: holder, + keys_to_verify, + }) + .await + { + error!("SwarmDriver failed to send event: {}", error); + } // In additon to verify the sender, we also verify a random close node. // This is to avoid malicious node escaping the check by never send a replication_list. // With further reduced probability of 1% (5% * 20%) - if rng.gen_bool(0.2) { - let close_group_peers = self - .swarm - .behaviour_mut() - .kademlia - .get_closest_local_peers(&self.self_peer_id.into()) - .map(|peer| peer.into_preimage()) - .take(CLOSE_GROUP_SIZE) - .collect_vec(); - if close_group_peers.len() == CLOSE_GROUP_SIZE { - loop { - let index: usize = OsRng.gen_range(0..close_group_peers.len()); - let candidate = NetworkAddress::from_peer(close_group_peers[index]); - if sender != candidate { - self.verify_peer_storage(candidate); - break; - } + let close_group_peers = sort_peers_by_address_and_limit( + &peers, + &NetworkAddress::from_peer(our_peer_id), + CLOSE_GROUP_SIZE, + ) + .unwrap_or_default(); + + loop { + let index: usize = OsRng.gen_range(0..close_group_peers.len()); + let candidate_peer_id = *close_group_peers[index]; + let candidate = NetworkAddress::from_peer(*close_group_peers[index]); + if sender != candidate { + let keys_to_verify = + Self::select_verification_data_candidates(&peers, &all_keys, &candidate); + + if keys_to_verify.is_empty() { + debug!("No valid candidate to be checked against peer {candidate:?}"); + } else if let Err(error) = event_sender + .send(NetworkEvent::ChunkProofVerification { + peer_id: candidate_peer_id, + keys_to_verify, + }) + .await + { + error!("SwarmDriver failed to send event: {}", error); } + + break; } } - } + }); } /// Check among all chunk type records that we have, select those close to the peer, /// and randomly pick one as the verification candidate. - fn verify_peer_storage(&mut self, peer: NetworkAddress) { - let mut closest_peers = self - .swarm - .behaviour_mut() - .kademlia - .get_closest_local_peers(&self.self_peer_id.into()) - .map(|peer| peer.into_preimage()) - .take(20) - .collect_vec(); - closest_peers.push(self.self_peer_id); - + fn select_verification_data_candidates( + all_peers: &Vec, + all_keys: &HashMap, + peer: &NetworkAddress, + ) -> Vec { let target_peer = if let Some(peer_id) = peer.as_peer_id() { peer_id } else { error!("Target {peer:?} is not a valid PeerId"); - return; + return vec![]; }; - let all_keys = self - .swarm - .behaviour_mut() - .kademlia - .store_mut() - .record_addresses_ref(); - // Targeted chunk type record shall be expected within the close range from our perspective. let mut verify_candidates: Vec = all_keys .values() .filter_map(|(addr, record_type)| { if RecordType::Chunk == *record_type { - match sort_peers_by_address(&closest_peers, addr, CLOSE_GROUP_SIZE) { + // Here we take the actual closest, as this is where we want to be + // strict about who does have the data... + match sort_peers_by_address_and_limit(all_peers, addr, CLOSE_GROUP_SIZE) { Ok(close_group) => { if close_group.contains(&&target_peer) { Some(addr.clone()) @@ -319,17 +335,6 @@ impl SwarmDriver { verify_candidates.sort_by_key(|a| peer.distance(a)); - // To ensure the candidate mush have to be held by the peer, - // we only carry out check when there are already certain amount of chunks uploaded - // AND choose candidate from certain reduced range. - if verify_candidates.len() > 50 { - let index: usize = OsRng.gen_range(0..(verify_candidates.len() / 2)); - self.send_event(NetworkEvent::ChunkProofVerification { - peer_id: target_peer, - keys_to_verify: vec![verify_candidates[index].clone()], - }); - } else { - debug!("No valid candidate to be checked against peer {peer:?}"); - } + verify_candidates } } diff --git a/sn_networking/src/event/swarm.rs b/sn_networking/src/event/swarm.rs index 982088f102..2416b5681c 100644 --- a/sn_networking/src/event/swarm.rs +++ b/sn_networking/src/event/swarm.rs @@ -7,8 +7,8 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{ - event::NodeEvent, multiaddr_is_global, multiaddr_strip_p2p, relay_manager::is_a_relayed_peer, - target_arch::Instant, NetworkEvent, Result, SwarmDriver, + cmd::NetworkSwarmCmd, event::NodeEvent, multiaddr_is_global, multiaddr_strip_p2p, + relay_manager::is_a_relayed_peer, target_arch::Instant, NetworkEvent, Result, SwarmDriver, }; #[cfg(feature = "local")] use libp2p::mdns; @@ -25,7 +25,7 @@ use libp2p::{ }; use sn_protocol::version::{IDENTIFY_NODE_VERSION_STR, IDENTIFY_PROTOCOL_STR}; use std::collections::HashSet; -use tokio::time::Duration; +use tokio::{sync::oneshot, time::Duration}; impl SwarmDriver { /// Handle `SwarmEvents` @@ -244,7 +244,7 @@ impl SwarmDriver { } // If we are not local, we care only for peers that we dialed and thus are reachable. - if self.local || has_dialed { + if !self.local && has_dialed { // A bad node cannot establish a connection with us. So we can add it to the RT directly. self.remove_bootstrap_from_full(peer_id); @@ -254,7 +254,10 @@ impl SwarmDriver { multiaddr.iter().any(|p| matches!(p, Protocol::P2pCircuit)) }); } + } + if self.local || has_dialed { + // If we are not local, we care only for peers that we dialed and thus are reachable. debug!(%peer_id, ?addrs, "identify: attempting to add addresses to routing table"); // Attempt to add the addresses to the routing table. @@ -392,6 +395,7 @@ impl SwarmDriver { let _ = self.live_connected_peers.remove(&connection_id); self.record_connection_metrics(); + let mut failed_peer_addresses = vec![]; // we need to decide if this was a critical error and the peer should be removed from the routing table let should_clean_peer = match error { DialError::Transport(errors) => { @@ -401,10 +405,14 @@ impl SwarmDriver { // so we default to it not being a real issue // unless there are _specific_ errors (connection refused eg) error!("Dial errors len : {:?}", errors.len()); - let mut there_is_a_serious_issue = false; - for (_addr, err) in errors { + let mut remove_peer_track_peer_issue = false; + for (addr, err) in errors { error!("OutgoingTransport error : {err:?}"); + if !failed_peer_addresses.contains(&addr) { + failed_peer_addresses.push(addr) + } + match err { TransportError::MultiaddrNotSupported(addr) => { warn!("Multiaddr not supported : {addr:?}"); @@ -414,14 +422,13 @@ impl SwarmDriver { println!("If this was your bootstrap peer, restart your node with a supported multiaddr"); } // if we can't dial a peer on a given address, we should remove it from the routing table - there_is_a_serious_issue = true + remove_peer_track_peer_issue = false } TransportError::Other(err) => { - let problematic_errors = [ - "ConnectionRefused", - "HostUnreachable", - "HandshakeTimedOut", - ]; + let problematic_errors = + ["ConnectionRefused", "HostUnreachable"]; + + let intermittent_errors = ["HandshakeTimedOut"]; let is_bootstrap_peer = self .bootstrap_peers @@ -432,7 +439,7 @@ impl SwarmDriver { && self.peers_in_rt < self.bootstrap_peers.len() { warn!("OutgoingConnectionError: On bootstrap peer {failed_peer_id:?}, while still in bootstrap mode, ignoring"); - there_is_a_serious_issue = false; + remove_peer_track_peer_issue = false; } else { // It is really difficult to match this error, due to being eg: // Custom { kind: Other, error: Left(Left(Os { code: 61, kind: ConnectionRefused, message: "Connection refused" })) } @@ -443,13 +450,19 @@ impl SwarmDriver { .any(|err| error_msg.contains(err)) { warn!("Problematic error encountered: {error_msg}"); - there_is_a_serious_issue = true; + remove_peer_track_peer_issue = true; + } else if intermittent_errors + .iter() + .any(|err| error_msg.contains(err)) + { + warn!("Intermittent error encountered: {error_msg}"); + remove_peer_track_peer_issue = false; } } } } } - there_is_a_serious_issue + remove_peer_track_peer_issue } DialError::NoAddresses => { // We provided no address, and while we can't really blame the peer @@ -490,7 +503,7 @@ impl SwarmDriver { }; if should_clean_peer { - warn!("Tracking issue of {failed_peer_id:?}. Clearing it out for now"); + warn!("Serious issue with {failed_peer_id:?}. Clearing it out for now"); if let Some(dead_peer) = self .swarm @@ -501,6 +514,15 @@ impl SwarmDriver { self.update_on_peer_removal(*dead_peer.node.key.preimage()); } } + + if !should_clean_peer { + // lets try and redial. + for addr in failed_peer_addresses { + let (sender, _recv) = oneshot::channel(); + + self.queue_network_swarm_cmd(NetworkSwarmCmd::Dial { addr, sender }); + } + } } SwarmEvent::IncomingConnectionError { connection_id, diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 27f07bdb3e..c9244dbc46 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -78,10 +78,6 @@ use tokio::time::Duration; /// The type of quote for a selected payee. pub type PayeeQuote = (PeerId, RewardsAddress, PaymentQuote); -/// The count of peers that will be considered as close to a record target, -/// that a replication of the record shall be sent/accepted to/by the peer. -pub const REPLICATION_PEERS_COUNT: usize = CLOSE_GROUP_SIZE + 2; - /// Majority of a given group (i.e. > 1/2). #[inline] pub const fn close_group_majority() -> usize { @@ -97,17 +93,47 @@ const MIN_WAIT_BEFORE_READING_A_PUT: Duration = Duration::from_millis(300); /// Sort the provided peers by their distance to the given `NetworkAddress`. /// Return with the closest expected number of entries if has. -pub fn sort_peers_by_address<'a>( +pub fn sort_peers_by_address_and_limit<'a>( peers: &'a Vec, address: &NetworkAddress, expected_entries: usize, ) -> Result> { - sort_peers_by_key(peers, &address.as_kbucket_key(), expected_entries) + sort_peers_by_key_and_limit(peers, &address.as_kbucket_key(), expected_entries) +} + +/// Sort the provided peers by their distance to the given `NetworkAddress`. +/// Return with the closest expected number of entries if has. +pub fn sort_peers_by_distance_to( + peers: &[PeerId], + queried_address: NetworkAddress, +) -> Vec { + let mut sorted_distances: Vec<_> = peers + .iter() + .map(|peer| { + let addr = NetworkAddress::from_peer(*peer); + queried_address.distance(&addr) + }) + .collect(); + + sorted_distances.sort(); + + sorted_distances +} + +/// Sort the provided peers by their distance to the given `NetworkAddress`. +/// Return with the closest expected number of entries if has. +#[allow(clippy::result_large_err)] +pub fn sort_peers_by_address_and_limit_by_distance<'a>( + peers: &'a Vec, + address: &NetworkAddress, + distance: KBucketDistance, +) -> Result> { + limit_peers_by_distance(peers, &address.as_kbucket_key(), distance) } /// Sort the provided peers by their distance to the given `KBucketKey`. /// Return with the closest expected number of entries if has. -pub fn sort_peers_by_key<'a, T>( +pub fn sort_peers_by_key_and_limit<'a, T>( peers: &'a Vec, key: &KBucketKey, expected_entries: usize, @@ -144,6 +170,40 @@ pub fn sort_peers_by_key<'a, T>( Ok(sorted_peers) } +/// Only return peers closer to key than the provided distance +/// Their distance is measured by closeness to the given `KBucketKey`. +/// Return with the closest expected number of entries if has. +#[allow(clippy::result_large_err)] +pub fn limit_peers_by_distance<'a, T>( + peers: &'a Vec, + key: &KBucketKey, + distance: KBucketDistance, +) -> Result> { + // Check if there are enough peers to satisfy the request. + // bail early if that's not the case + if CLOSE_GROUP_SIZE > peers.len() { + warn!("Not enough peers in the k-bucket to satisfy the request"); + return Err(NetworkError::NotEnoughPeers { + found: peers.len(), + required: CLOSE_GROUP_SIZE, + }); + } + + // Create a vector of tuples where each tuple is a reference to a peer and its distance to the key. + // This avoids multiple computations of the same distance in the sorting process. + let mut peers_within_distance: Vec<&PeerId> = Vec::with_capacity(peers.len()); + + for peer_id in peers { + let addr = NetworkAddress::from_peer(*peer_id); + let peer_distance = key.distance(&addr.as_kbucket_key()); + + if peer_distance < distance { + peers_within_distance.push(peer_id); + } + } + + Ok(peers_within_distance) +} #[derive(Clone, Debug)] /// API to interact with the underlying Swarm @@ -197,6 +257,13 @@ impl Network { &self.inner.local_swarm_cmd_sender } + /// Return the GetRange as determined by the internal SwarmDriver + pub async fn get_range(&self) -> Result { + let (sender, receiver) = oneshot::channel(); + self.send_local_swarm_cmd(LocalSwarmCmd::GetCurrentRange { sender }); + receiver.await.map_err(NetworkError::from) + } + /// Signs the given data with the node's keypair. pub fn sign(&self, msg: &[u8]) -> Result> { self.keypair().sign(msg).map_err(NetworkError::from) @@ -220,19 +287,121 @@ impl Network { receiver.await? } + /// Replicate a fresh record to its close group peers. + /// This should not be triggered by a record we receive via replicaiton fetch + pub async fn replicate_valid_fresh_record(&self, paid_key: RecordKey, record_type: RecordType) { + let network = self; + + let start = std::time::Instant::now(); + let pretty_key = PrettyPrintRecordKey::from(&paid_key); + + // first we wait until our own network store can return the record + // otherwise it may not be fully written yet + let mut retry_count = 0; + trace!("Checking we have successfully stored the fresh record {pretty_key:?} in the store before replicating"); + loop { + let record = match network.get_local_record(&paid_key).await { + Ok(record) => record, + Err(err) => { + error!( + "Replicating fresh record {pretty_key:?} get_record_from_store errored: {err:?}" + ); + None + } + }; + + if record.is_some() { + break; + } + + if retry_count > 10 { + error!( + "Could not get record from store for replication: {pretty_key:?} after 10 retries" + ); + return; + } + + retry_count += 1; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + trace!("Start replication of fresh record {pretty_key:?} from store"); + + let all_peers = match network.get_all_local_peers_excluding_self().await { + Ok(peers) => peers, + Err(err) => { + error!( + "Replicating fresh record {pretty_key:?} get_all_local_peers errored: {err:?}" + ); + return; + } + }; + + let data_addr = NetworkAddress::from_record_key(&paid_key); + let mut peers_to_replicate_to = match network.get_range().await { + Err(error) => { + error!("Replicating fresh record {pretty_key:?} get_range errored: {error:?}"); + + return; + } + + Ok(our_get_range) => { + match sort_peers_by_address_and_limit_by_distance( + &all_peers, + &data_addr, + our_get_range, + ) { + Ok(result) => result, + Err(err) => { + error!("When replicating fresh record {pretty_key:?}, sort error: {err:?}"); + return; + } + } + } + }; + + if peers_to_replicate_to.len() < CLOSE_GROUP_SIZE { + warn!( + "Replicating fresh record {pretty_key:?} current GetRange insufficient for secure replication. Falling back to CLOSE_GROUP_SIZE" + ); + + peers_to_replicate_to = + match sort_peers_by_address_and_limit(&all_peers, &data_addr, CLOSE_GROUP_SIZE) { + Ok(result) => result, + Err(err) => { + error!("When replicating fresh record {pretty_key:?}, sort error: {err:?}"); + return; + } + }; + } + + let our_peer_id = network.peer_id(); + let our_address = NetworkAddress::from_peer(our_peer_id); + #[allow(clippy::mutable_key_type)] // for Bytes in NetworkAddress + let keys = vec![(data_addr.clone(), record_type.clone())]; + + for peer_id in &peers_to_replicate_to { + trace!("Replicating fresh record {pretty_key:?} to {peer_id:?}"); + let request = Request::Cmd(Cmd::Replicate { + holder: our_address.clone(), + keys: keys.clone(), + }); + + network.send_req_ignore_reply(request, **peer_id); + } + trace!( + "Completed replicate fresh record {pretty_key:?} to {:?} peers on store, in {:?}", + peers_to_replicate_to.len(), + start.elapsed() + ); + } + /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name. /// Excludes the client's `PeerId` while calculating the closest peers. pub async fn client_get_closest_peers(&self, key: &NetworkAddress) -> Result> { self.get_closest_peers(key, true).await } - /// Returns the closest peers to the given `NetworkAddress`, sorted by their distance to the key. - /// - /// Includes our node's `PeerId` while calculating the closest peers. - pub async fn node_get_closest_peers(&self, key: &NetworkAddress) -> Result> { - self.get_closest_peers(key, false).await - } - /// Returns a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that /// bucket. /// Does not include self @@ -245,10 +414,10 @@ impl Network { } /// Returns all the PeerId from all the KBuckets from our local Routing Table - /// Also contains our own PeerId. - pub async fn get_closest_k_value_local_peers(&self) -> Result> { + /// Excludes our own PeerId. + pub async fn get_all_local_peers_excluding_self(&self) -> Result> { let (sender, receiver) = oneshot::channel(); - self.send_local_swarm_cmd(LocalSwarmCmd::GetClosestKLocalPeers { sender }); + self.send_local_swarm_cmd(LocalSwarmCmd::GetAllLocalPeersExcludingSelf { sender }); receiver .await @@ -498,6 +667,10 @@ impl Network { key: RecordKey, cfg: &GetRecordCfg, ) -> Result { + use std::collections::BTreeSet; + + use sn_transfers::SignedSpend; + let retry_duration = cfg.retry_strategy.map(|strategy| strategy.get_duration()); backoff::future::retry( backoff::ExponentialBackoff { @@ -528,7 +701,7 @@ impl Network { Err(GetRecordError::RecordDoesNotMatch(_)) => { warn!("The returned record does not match target {pretty_key:?}."); } - Err(GetRecordError::NotEnoughCopies { expected, got, .. }) => { + Err(GetRecordError::NotEnoughCopiesInRange { expected, got, .. }) => { warn!("Not enough copies ({got}/{expected}) found yet for {pretty_key:?}."); } // libp2p RecordNotFound does mean no holders answered. @@ -537,8 +710,39 @@ impl Network { Err(GetRecordError::RecordNotFound) => { warn!("No holder of record '{pretty_key:?}' found."); } - Err(GetRecordError::SplitRecord { .. }) => { + Err(GetRecordError::SplitRecord { result_map }) => { error!("Encountered a split record for {pretty_key:?}."); + + // attempt to deserialise and accumulate any spends + let mut accumulated_spends = BTreeSet::new(); + let results_count = result_map.len(); + // try and accumulate any SpendAttempts + if results_count > 1 { + info!("For record {pretty_key:?}, we have more than one result returned."); + // Allow for early bail if we've already seen a split SpendAttempt + for (record, _) in result_map.values() { + match get_raw_signed_spends_from_record(record) { + Ok(spends) => { + accumulated_spends.extend(spends); + } + Err(_) => { + continue; + } + } + } + } + + // we have a Double SpendAttempt and will exit + if accumulated_spends.len() > 1 { + info!("For record {pretty_key:?} task found split record for a spend, accumulated and sending them as a single record"); + let accumulated_spends = + accumulated_spends.into_iter().collect::>(); + + return Err(backoff::Error::Permanent(NetworkError::DoubleSpendAttempt( + accumulated_spends, + ))); + } + } Err(GetRecordError::QueryTimeout) => { error!("Encountered query timeout for {pretty_key:?}."); @@ -903,7 +1107,7 @@ impl Network { debug!("Network knowledge of close peers to {key:?} are: {close_peers_pretty_print:?}"); } - let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE)?; + let closest_peers = sort_peers_by_address_and_limit(&closest_peers, key, CLOSE_GROUP_SIZE)?; Ok(closest_peers.into_iter().cloned().collect()) } diff --git a/sn_networking/src/network_discovery.rs b/sn_networking/src/network_discovery.rs index f3f4986134..3d82c944fb 100644 --- a/sn_networking/src/network_discovery.rs +++ b/sn_networking/src/network_discovery.rs @@ -8,7 +8,6 @@ use crate::target_arch::Instant; use libp2p::{kad::KBucketKey, PeerId}; -use rand::{thread_rng, Rng}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use sn_protocol::NetworkAddress; use std::collections::{btree_map::Entry, BTreeMap}; @@ -52,13 +51,13 @@ impl NetworkDiscovery { } /// The result from the kad::GetClosestPeers are again used to update our kbucket. - pub(crate) fn handle_get_closest_query(&mut self, closest_peers: Vec) { + pub(crate) fn handle_get_closest_query(&mut self, closest_peers: &[PeerId]) { let now = Instant::now(); let candidates_map: BTreeMap> = closest_peers - .into_iter() + .iter() .filter_map(|peer| { - let peer = NetworkAddress::from_peer(peer); + let peer = NetworkAddress::from_peer(*peer); let peer_key = peer.as_kbucket_key(); peer_key .distance(&self.self_key) @@ -83,18 +82,28 @@ impl NetworkDiscovery { /// Returns one random candidate per bucket. Also tries to refresh the candidate list. /// Todo: Limit the candidates to return. Favor the closest buckets. - pub(crate) fn candidates(&mut self) -> Vec<&NetworkAddress> { - self.try_refresh_candidates(); - - let mut rng = thread_rng(); + pub(crate) fn candidates(&mut self) -> Vec { let mut op = Vec::with_capacity(self.candidates.len()); - let candidates = self.candidates.values().filter_map(|candidates| { - // get a random index each time - let random_index = rng.gen::() % candidates.len(); - candidates.get(random_index) - }); - op.extend(candidates); + let mut generate_fresh_candidates = false; + for addresses in self.candidates.values_mut() { + // get a random candidate from each bucket each time + if addresses.is_empty() { + generate_fresh_candidates = true; + continue; + } + + // remove the first each time + let address = addresses.remove(0); + op.push(address); + } + + if generate_fresh_candidates { + // we only refresh when we are running low on candidates + self.try_refresh_candidates(); + } + + debug!("Candidates returned: {}", op.len()); op } diff --git a/sn_networking/src/record_store.rs b/sn_networking/src/record_store.rs index ee4e413c5e..0551fc03f2 100644 --- a/sn_networking/src/record_store.rs +++ b/sn_networking/src/record_store.rs @@ -90,7 +90,7 @@ pub struct NodeRecordStore { /// ilog2 distance range of responsible records /// AKA: how many buckets of data do we consider "close" /// None means accept all records. - responsible_distance_range: Option, + responsible_distance_range: Option, #[cfg(feature = "open-metrics")] /// Used to report the number of records held by the store to the metrics server. record_count_metric: Option, @@ -315,11 +315,6 @@ impl NodeRecordStore { self } - /// Returns the current distance ilog2 (aka bucket) range of CLOSE_GROUP nodes. - pub fn get_responsible_distance_range(&self) -> Option { - self.responsible_distance_range - } - // Converts a Key into a Hex string. fn generate_filename(key: &Key) -> String { hex::encode(key.as_ref()) @@ -474,8 +469,7 @@ impl NodeRecordStore { let mut removed_keys = Vec::new(); self.records.retain(|key, _val| { let kbucket_key = KBucketKey::new(key.to_vec()); - let is_in_range = - responsible_range >= self.local_key.distance(&kbucket_key).ilog2().unwrap_or(0); + let is_in_range = responsible_range >= self.local_key.distance(&kbucket_key); if !is_in_range { removed_keys.push(key.clone()); } @@ -699,7 +693,7 @@ impl NodeRecordStore { pub fn get_records_within_distance_range( &self, records: HashSet<&Key>, - distance_range: u32, + distance_range: Distance, ) -> usize { debug!( "Total record count is {:?}. Distance is: {distance_range:?}", @@ -710,7 +704,7 @@ impl NodeRecordStore { .iter() .filter(|key| { let kbucket_key = KBucketKey::new(key.to_vec()); - distance_range >= self.local_key.distance(&kbucket_key).ilog2().unwrap_or(0) + distance_range >= self.local_key.distance(&kbucket_key) }) .count(); @@ -719,8 +713,8 @@ impl NodeRecordStore { } /// Setup the distance range. - pub(crate) fn set_responsible_distance_range(&mut self, farthest_responsible_bucket: u32) { - self.responsible_distance_range = Some(farthest_responsible_bucket); + pub(crate) fn set_responsible_distance_range(&mut self, farthest_distance: Distance) { + self.responsible_distance_range = Some(farthest_distance); } } @@ -1500,10 +1494,7 @@ mod tests { .wrap_err("Could not parse record store key")?, ); // get the distance to this record from our local key - let distance = self_address - .distance(&halfway_record_address) - .ilog2() - .unwrap_or(0); + let distance = self_address.distance(&halfway_record_address); // must be plus one bucket from the halfway record store.set_responsible_distance_range(distance); diff --git a/sn_networking/src/record_store_api.rs b/sn_networking/src/record_store_api.rs index 8e3bc67364..64fd790ccd 100644 --- a/sn_networking/src/record_store_api.rs +++ b/sn_networking/src/record_store_api.rs @@ -10,7 +10,7 @@ use crate::record_store::{ClientRecordStore, NodeRecordStore}; use libp2p::kad::{ store::{RecordStore, Result}, - ProviderRecord, Record, RecordKey, + KBucketDistance, ProviderRecord, Record, RecordKey, }; use sn_evm::{AttoTokens, QuotingMetrics}; use sn_protocol::{storage::RecordType, NetworkAddress}; @@ -130,17 +130,7 @@ impl UnifiedRecordStore { } } - pub(crate) fn get_farthest_replication_distance_bucket(&self) -> Option { - match self { - Self::Client(_store) => { - warn!("Calling get_distance_range at Client. This should not happen"); - None - } - Self::Node(store) => store.get_responsible_distance_range(), - } - } - - pub(crate) fn set_distance_range(&mut self, distance: u32) { + pub(crate) fn set_distance_range(&mut self, distance: KBucketDistance) { match self { Self::Client(_store) => { warn!("Calling set_distance_range at Client. This should not happen"); diff --git a/sn_networking/src/replication_fetcher.rs b/sn_networking/src/replication_fetcher.rs index 1b90ac9a53..5e0d3a3ad4 100644 --- a/sn_networking/src/replication_fetcher.rs +++ b/sn_networking/src/replication_fetcher.rs @@ -8,7 +8,9 @@ #![allow(clippy::mutable_key_type)] use crate::target_arch::spawn; +use crate::CLOSE_GROUP_SIZE; use crate::{event::NetworkEvent, target_arch::Instant}; +use itertools::Itertools; use libp2p::{ kad::{KBucketDistance as Distance, RecordKey, K_VALUE}, PeerId, @@ -41,8 +43,8 @@ pub(crate) struct ReplicationFetcher { // Avoid fetching same chunk from different nodes AND carry out too many parallel tasks. on_going_fetches: HashMap<(RecordKey, RecordType), (PeerId, ReplicationTimeout)>, event_sender: mpsc::Sender, - /// ilog2 bucket distance range that the incoming key shall be fetched - distance_range: Option, + /// KBucketDistance range that the incoming key shall be fetched + distance_range: Option, /// Restrict fetch range to closer than this value /// used when the node is full, but we still have "close" data coming in /// that is _not_ closer than our farthest max record @@ -63,7 +65,7 @@ impl ReplicationFetcher { } /// Set the distance range. - pub(crate) fn set_replication_distance_range(&mut self, distance_range: u32) { + pub(crate) fn set_replication_distance_range(&mut self, distance_range: Distance) { self.distance_range = Some(distance_range); } @@ -76,6 +78,7 @@ impl ReplicationFetcher { holder: PeerId, incoming_keys: Vec<(NetworkAddress, RecordType)>, locally_stored_keys: &HashMap, + all_local_peers: &[PeerId], ) -> Vec<(PeerId, RecordKey)> { // remove locally stored from incoming_keys let mut new_incoming_keys: Vec<_> = incoming_keys @@ -133,12 +136,30 @@ impl ReplicationFetcher { .retain(|_, time_out| *time_out > Instant::now()); let mut out_of_range_keys = vec![]; + // Filter out those out_of_range ones among the incoming_keys. if let Some(ref distance_range) = self.distance_range { new_incoming_keys.retain(|(addr, _record_type)| { - let is_in_range = - self_address.distance(addr).ilog2().unwrap_or(0) <= *distance_range; + // find all closer peers to the data + let closer_peers_len = all_local_peers + .iter() + .filter(|peer_id| { + let peer_address = NetworkAddress::from_peer(**peer_id); + addr.distance(&peer_address) <= *distance_range + }) + .collect_vec() + .len(); + + // we consider ourselves in range if + // A) We don't know enough closer peers than ourselves + // or B) The distance to the data is within our GetRange + let is_in_range = closer_peers_len <= CLOSE_GROUP_SIZE + || self_address.distance(addr).ilog2() <= distance_range.ilog2(); if !is_in_range { + warn!( + "Rejecting incoming key: {addr:?} as out of range. {:?} is larger than {:?} ", + self_address.distance(addr).ilog2(), + distance_range.ilog2()); out_of_range_keys.push(addr.clone()); } is_in_range @@ -428,8 +449,12 @@ mod tests { incoming_keys.push((key, RecordType::Chunk)); }); - let keys_to_fetch = - replication_fetcher.add_keys(PeerId::random(), incoming_keys, &locally_stored_keys); + let keys_to_fetch = replication_fetcher.add_keys( + PeerId::random(), + incoming_keys, + &locally_stored_keys, + &[], + ); assert_eq!(keys_to_fetch.len(), MAX_PARALLEL_FETCH); // we should not fetch anymore keys @@ -441,6 +466,7 @@ mod tests { PeerId::random(), vec![(key_1, RecordType::Chunk), (key_2, RecordType::Chunk)], &locally_stored_keys, + &[], ); assert!(keys_to_fetch.is_empty()); @@ -451,6 +477,7 @@ mod tests { PeerId::random(), vec![(key, RecordType::Chunk)], &locally_stored_keys, + &[], ); assert!(!keys_to_fetch.is_empty()); @@ -476,34 +503,41 @@ mod tests { let mut replication_fetcher = ReplicationFetcher::new(peer_id, event_sender); // Set distance range + // way to update this test let distance_target = NetworkAddress::from_peer(PeerId::random()); - let distance_range = self_address.distance(&distance_target).ilog2().unwrap_or(1); + let distance_range = self_address.distance(&distance_target); replication_fetcher.set_replication_distance_range(distance_range); + // generate a list of close peers + let close_peers = (0..100).map(|_| PeerId::random()).collect::>(); + let mut incoming_keys = Vec::new(); let mut in_range_keys = 0; (0..100).for_each(|_| { let random_data: Vec = (0..50).map(|_| rand::random::()).collect(); let key = NetworkAddress::from_record_key(&RecordKey::from(random_data)); - if key.distance(&self_address).ilog2().unwrap_or(0) <= distance_range { + if key.distance(&self_address).ilog2() <= distance_range.ilog2() { in_range_keys += 1; } incoming_keys.push((key, RecordType::Chunk)); }); - let keys_to_fetch = - replication_fetcher.add_keys(PeerId::random(), incoming_keys, &Default::default()); + let keys_to_fetch = replication_fetcher.add_keys( + PeerId::random(), + incoming_keys, + &Default::default(), + &close_peers, + ); assert_eq!( keys_to_fetch.len(), replication_fetcher.on_going_fetches.len(), "keys to fetch and ongoing fetches should match" ); - assert_eq!( - in_range_keys, - keys_to_fetch.len() + replication_fetcher.to_be_fetched.len(), - "all keys should be in range and in the fetcher" + assert!( + keys_to_fetch.len() + replication_fetcher.to_be_fetched.len() >= in_range_keys, + "at least all keys in range should be in the fetcher" ); } } diff --git a/sn_networking/src/transfers.rs b/sn_networking/src/transfers.rs index 76b6349ce1..40c6182f94 100644 --- a/sn_networking/src/transfers.rs +++ b/sn_networking/src/transfers.rs @@ -6,9 +6,7 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::{ - close_group_majority, driver::GetRecordCfg, GetRecordError, Network, NetworkError, Result, -}; +use crate::{driver::GetRecordCfg, Network, NetworkError, Result}; use libp2p::kad::{Quorum, Record}; use sn_protocol::{ storage::{try_deserialize_record, RecordHeader, RecordKind, RetryStrategy, SpendAddress}, @@ -39,7 +37,7 @@ impl Network { }; let record = self.get_record_from_network(key.clone(), &get_cfg).await?; debug!( - "Got record from the network, {:?}", + "Got raw spends from the network, {:?}", PrettyPrintRecordKey::from(&record.key) ); get_raw_signed_spends_from_record(&record) @@ -51,38 +49,14 @@ impl Network { /// If we get a quorum error, we increase the RetryStrategy pub async fn get_spend(&self, address: SpendAddress) -> Result { let key = NetworkAddress::from_spend_address(address).to_record_key(); - let mut get_cfg = GetRecordCfg { + let get_cfg = GetRecordCfg { get_quorum: Quorum::All, retry_strategy: Some(RetryStrategy::Quick), target_record: None, expected_holders: Default::default(), is_register: false, }; - let record = match self.get_record_from_network(key.clone(), &get_cfg).await { - Ok(record) => record, - Err(NetworkError::GetRecordError(GetRecordError::NotEnoughCopies { - record, - expected, - got, - })) => { - // if majority holds the spend, it might be worth to be trusted. - if got >= close_group_majority() { - debug!("At least a majority nodes hold the spend {address:?}, going to trust it if can fetch with majority again."); - get_cfg.get_quorum = Quorum::Majority; - get_cfg.retry_strategy = Some(RetryStrategy::Balanced); - self.get_record_from_network(key, &get_cfg).await? - } else { - return Err(NetworkError::GetRecordError( - GetRecordError::NotEnoughCopies { - record, - expected, - got, - }, - )); - } - } - Err(err) => return Err(err), - }; + let record = self.get_record_from_network(key.clone(), &get_cfg).await?; debug!( "Got record from the network, {:?}", PrettyPrintRecordKey::from(&record.key) diff --git a/sn_node/src/put_validation.rs b/sn_node/src/put_validation.rs index 3f3343f403..b0dd3f6857 100644 --- a/sn_node/src/put_validation.rs +++ b/sn_node/src/put_validation.rs @@ -552,7 +552,7 @@ impl Node { }; debug!( - "Got {} validated spends with key: {unique_pubkey:?} at {pretty_key:?}", + "Found {} spends with key: {unique_pubkey:?} at {pretty_key:?}", validated_spends.len() ); @@ -564,14 +564,12 @@ impl Node { expires: None, }; self.network().put_local_record(record); - debug!( - "Successfully stored validated spends with key: {unique_pubkey:?} at {pretty_key:?}" - ); + debug!("Successfully stored spends with key: {unique_pubkey:?} at {pretty_key:?}"); // Just log the double spend attempt. DoubleSpend error during PUT is not used and would just lead to // RecordRejected marker (which is incorrect, since we store double spends). if validated_spends.len() > 1 { - warn!("Got double spend(s) of len {} for the Spend PUT with unique_pubkey {unique_pubkey}", validated_spends.len()); + warn!("Got Burnt SpendAttempts of len {} for the Spend PUT with unique_pubkey {unique_pubkey} at {pretty_key:?}", validated_spends.len()); } self.record_metrics(Marker::ValidSpendRecordPutFromNetwork(&pretty_key)); @@ -756,13 +754,14 @@ impl Node { } spends } - Err(NetworkError::GetRecordError(GetRecordError::NotEnoughCopies { + Err(NetworkError::GetRecordError(GetRecordError::NotEnoughCopiesInRange { record, got, + range, .. })) => { info!( - "Retrieved {got} copies of the record for {unique_pubkey:?} from the network" + "Retrieved {got} copies of the record for {unique_pubkey:?} from the network in range {range}" ); match get_raw_signed_spends_from_record(&record) { Ok(spends) => spends, diff --git a/sn_node/src/replication.rs b/sn_node/src/replication.rs index 59e0cff078..80ec25b157 100644 --- a/sn_node/src/replication.rs +++ b/sn_node/src/replication.rs @@ -6,15 +6,18 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::{error::Result, node::Node}; +use crate::{ + error::{Error, Result}, + node::Node, +}; use libp2p::{ kad::{Quorum, Record, RecordKey}, PeerId, }; -use sn_networking::{sort_peers_by_address, GetRecordCfg, Network, REPLICATION_PEERS_COUNT}; +use sn_networking::{GetRecordCfg, Network}; use sn_protocol::{ - messages::{Cmd, Query, QueryResponse, Request, Response}, - storage::RecordType, + messages::{Query, QueryResponse, Request, Response}, + storage::{try_serialize_record, RecordKind, RecordType}, NetworkAddress, PrettyPrintRecordKey, }; use tokio::task::spawn; @@ -79,12 +82,27 @@ impl Node { // Hence value of the flag actually doesn't matter. is_register: false, }; - match node.network().get_record_from_network(key, &get_cfg).await { + match node + .network() + .get_record_from_network(key.clone(), &get_cfg) + .await + { Ok(record) => record, - Err(err) => { - error!("During replication fetch of {pretty_key:?}, failed in re-attempt of get from network {err:?}"); - return; - } + Err(error) => match error { + sn_networking::NetworkError::DoubleSpendAttempt(spends) => { + debug!("Failed to fetch record {pretty_key:?} from the network, double spend attempt {spends:?}"); + + let bytes = try_serialize_record(&spends, RecordKind::Spend)?; + + Record { + key, + value: bytes.to_vec(), + publisher: None, + expires: None, + } + } + other_error => return Err(other_error.into()), + }, } }; @@ -96,6 +114,7 @@ impl Node { } else { debug!("Completed storing Replication Record {pretty_key:?} from network."); } + Ok::<(), Error>(()) }); } Ok(()) @@ -111,86 +130,9 @@ impl Node { let network = self.network().clone(); let _handle = spawn(async move { - let start = std::time::Instant::now(); - let pretty_key = PrettyPrintRecordKey::from(&paid_key); - - // first we wait until our own network store can return the record - // otherwise it may not be fully written yet - let mut retry_count = 0; - debug!("Checking we have successfully stored the fresh record {pretty_key:?} in the store before replicating"); - loop { - let record = match network.get_local_record(&paid_key).await { - Ok(record) => record, - Err(err) => { - error!( - "Replicating fresh record {pretty_key:?} get_record_from_store errored: {err:?}" - ); - None - } - }; - - if record.is_some() { - break; - } - - if retry_count > 10 { - error!( - "Could not get record from store for replication: {pretty_key:?} after 10 retries" - ); - return; - } - - retry_count += 1; - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } - - debug!("Start replication of fresh record {pretty_key:?} from store"); - - // Already contains self_peer_id - let mut closest_k_peers = match network.get_closest_k_value_local_peers().await { - Ok(peers) => peers, - Err(err) => { - error!("Replicating fresh record {pretty_key:?} get_closest_local_peers errored: {err:?}"); - return; - } - }; - - // remove ourself from these calculations - closest_k_peers.retain(|peer_id| peer_id != &network.peer_id()); - - let data_addr = NetworkAddress::from_record_key(&paid_key); - - let sorted_based_on_addr = match sort_peers_by_address( - &closest_k_peers, - &data_addr, - REPLICATION_PEERS_COUNT, - ) { - Ok(result) => result, - Err(err) => { - error!( - "When replicating fresh record {pretty_key:?}, having error when sort {err:?}" - ); - return; - } - }; - - let our_peer_id = network.peer_id(); - let our_address = NetworkAddress::from_peer(our_peer_id); - let keys = vec![(data_addr.clone(), record_type.clone())]; - - for peer_id in sorted_based_on_addr { - debug!("Replicating fresh record {pretty_key:?} to {peer_id:?}"); - let request = Request::Cmd(Cmd::Replicate { - holder: our_address.clone(), - keys: keys.clone(), - }); - - network.send_req_ignore_reply(request, *peer_id); - } - debug!( - "Completed replicate fresh record {pretty_key:?} on store, in {:?}", - start.elapsed() - ); + network + .replicate_valid_fresh_record(paid_key, record_type) + .await; }); } } diff --git a/sn_node/tests/double_spend.rs b/sn_node/tests/double_spend.rs index 8d06a87187..21ba72d619 100644 --- a/sn_node/tests/double_spend.rs +++ b/sn_node/tests/double_spend.rs @@ -13,18 +13,19 @@ // use common::client::{get_client_and_funded_wallet, get_wallet}; // use eyre::{bail, Result}; // use itertools::Itertools; -// use sn_transfers::{ -// get_genesis_sk, rng, NanoTokens, DerivationIndex, HotWallet, SignedTransaction, -// SpendReason, WalletError, GENESIS_CASHNOTE, -// }; // use sn_logging::LogBuilder; // use sn_networking::NetworkError; +// use sn_transfers::{ +// get_genesis_sk, rng, DerivationIndex, HotWallet, NanoTokens, SignedTransaction, SpendReason, +// WalletError, GENESIS_CASHNOTE, +// }; // use std::time::Duration; // use tracing::*; // #[tokio::test] // async fn cash_note_transfer_double_spend_fail() -> Result<()> { -// let _log_guards = LogBuilder::init_single_threaded_tokio_test("double_spend", true); +// let _log_guards = +// LogBuilder::init_single_threaded_tokio_test("cash_note_transfer_double_spend_fail", true); // // create 1 wallet add money from faucet // let first_wallet_dir = TempDir::new()?; @@ -40,7 +41,7 @@ // assert_eq!(third_wallet.balance(), NanoTokens::zero()); // // manually forge two transfers of the same source -// let amount = first_wallet_balance / 3; +// let amount = NanoTokens::from(first_wallet_balance / 3); // let to1 = first_wallet.address(); // let to2 = second_wallet.address(); // let to3 = third_wallet.address(); @@ -70,31 +71,50 @@ // )?; // // send both transfers to the network -// // upload won't error out, only error out during verification. + // info!("Sending both transfers to the network..."); -// let res = client.send_spends(transfer_to_2.spends.iter(), false).await; -// assert!(res.is_ok()); -// let res = client.send_spends(transfer_to_3.spends.iter(), false).await; -// assert!(res.is_ok()); +// // These may error (but may not depending on network speed) +// // so we're not going to rely on it here. +// let _ = client.send_spends(transfer_to_2.spends.iter(), true).await; -// // we wait 5s to ensure that the double spend attempt is detected and accumulated -// info!("Verifying the transfers from first wallet... Sleeping for 10 seconds."); -// tokio::time::sleep(Duration::from_secs(10)).await; +// let _ = client.send_spends(transfer_to_3.spends.iter(), true).await; + +// // check the CashNotes, it should fail +// info!("Verifying the transfers from first wallet..."); // let cash_notes_for_2: Vec<_> = transfer_to_2.output_cashnotes.clone(); // let cash_notes_for_3: Vec<_> = transfer_to_3.output_cashnotes.clone(); -// // check the CashNotes, it should fail -// let should_err1 = client.verify_cashnote(&cash_notes_for_2[0]).await; -// let should_err2 = client.verify_cashnote(&cash_notes_for_3[0]).await; -// info!("Both should fail during GET record accumulation : {should_err1:?} {should_err2:?}"); +// let mut should_err1 = client.verify_cashnote(&cash_notes_for_2[0]).await; +// let mut should_err2 = client.verify_cashnote(&cash_notes_for_3[0]).await; + +// for i in 0..5 { +// if should_err1.is_err() && should_err2.is_err() { +// break; +// } + +// tokio::time::sleep(Duration::from_secs(1)).await; +// info!("Retrying verification.{i}... for should_err1+2"); +// println!("Retrying verification{i} ... for should_err1+2"); +// should_err1 = client.verify_cashnote(&cash_notes_for_2[0]).await; +// should_err2 = client.verify_cashnote(&cash_notes_for_3[0]).await; +// } + +// info!("Both should fail during GET record accumulation + Double SpendAttempt should be flagged: {should_err1:?} {should_err2:?}"); +// println!("Both should fail during GET record accumulation + Double SpendAttempt should be flagged: {should_err1:?} {should_err2:?}"); // assert!(should_err1.is_err() && should_err2.is_err()); -// assert_matches!(should_err1, Err(WalletError::CouldNotVerifyTransfer(str)) => { -// assert!(str.starts_with("Network Error Double spend(s) attempt was detected"), "Expected double spend, but got {str}"); -// }); -// assert_matches!(should_err2, Err(WalletError::CouldNotVerifyTransfer(str)) => { -// assert!(str.starts_with("Network Error Double spend(s) attempt was detected"), "Expected double spend, but got {str}"); -// }); + +// assert_eq!( +// format!("{should_err1:?}"), +// format!("Err({:?})", WalletError::BurntSpend), +// "Should have been BurntSpend error, was: {should_err1:?}" +// ); + +// assert_eq!( +// format!("{should_err2:?}"), +// format!("Err({:?})", WalletError::BurntSpend), +// "Should have been BurntSpend error, was: {should_err2:?}" +// ); // Ok(()) // } @@ -168,7 +188,7 @@ // )?; // // send the transfer to the network which should reject it -// let res = client.send_spends(transfer2.spends.iter(), false).await; +// let res = client.send_spends(transfer2.spends.iter(), true).await; // std::mem::drop(exclusive_access); // assert_matches!(res, Err(WalletError::CouldNotSendMoney(_))); @@ -184,8 +204,8 @@ // let wallet_dir_1 = TempDir::new()?; // let (client, mut wallet_1) = get_client_and_funded_wallet(wallet_dir_1.path()).await?; -// let balance_1 = wallet_1.balance(); -// let amount = balance_1 / 2; +// let balance_1 = wallet_1.balance().as_nano(); +// let amount = NanoTokens::from(balance_1 / 2); // let to1 = wallet_1.address(); // // Send from 1 -> 2 @@ -262,14 +282,18 @@ // reason.clone(), // wallet_1.key(), // )?; // reuse the old cash notes -// client -// .send_spends(transfer_to_3.spends.iter(), false) -// .await?; +// // ignore response in case it errors out early, we verify below +// let _res = client.send_spends(transfer_to_3.spends.iter(), true).await; // info!("Verifying the transfers from 1 -> 3 wallet... It should error out."); // let cash_notes_for_3: Vec<_> = transfer_to_3.output_cashnotes.clone(); -// assert!(client.verify_cashnote(&cash_notes_for_3[0]).await.is_err()); // the old spend has been poisoned + +// let res = client.verify_cashnote(&cash_notes_for_3[0]).await; +// assert!(res.is_err(), "should be error, was {res:?}"); // the old spend has been poisoned + // info!("Verifying the original transfers from 1 -> 2 wallet... It should error out."); -// assert!(client.verify_cashnote(&cash_notes_for_2[0]).await.is_err()); // the old spend has been poisoned + +// let res = client.verify_cashnote(&cash_notes_for_2[0]).await; +// assert!(res.is_err(), "should be error, was {res:?}"); // the old spend has been poisoned // // The old spend has been poisoned, but spends from 22 -> 222 should still work // let wallet_dir_222 = TempDir::new()?; @@ -300,16 +324,16 @@ // client.verify_cashnote(&cash_notes_for_222[0]).await?; // // finally assert that we have a double spend attempt error here -// // we wait 1s to ensure that the double spend attempt is detected and accumulated +// // we wait to ensure that the double spend attempt is detected and accumulated // tokio::time::sleep(Duration::from_secs(5)).await; // match client.verify_cashnote(&cash_notes_for_2[0]).await { // Ok(_) => bail!("Cashnote verification should have failed"), // Err(e) => { -// assert!( -// e.to_string() -// .contains("Network Error Double spend(s) attempt was detected"), -// "error should reflect double spend attempt", +// assert_eq!( +// e.to_string(), +// format!("{}", WalletError::BurntSpend), +// "error should reflect double spend attempt was: {e:?}", // ); // } // } @@ -317,10 +341,10 @@ // match client.verify_cashnote(&cash_notes_for_3[0]).await { // Ok(_) => bail!("Cashnote verification should have failed"), // Err(e) => { -// assert!( -// e.to_string() -// .contains("Network Error Double spend(s) attempt was detected"), -// "error should reflect double spend attempt", +// assert_eq!( +// e.to_string(), +// format!("{}", WalletError::BurntSpend), +// "error should reflect double spend attempt was: {e:?}", // ); // } // } @@ -339,7 +363,7 @@ // let (client, mut wallet_a) = get_client_and_funded_wallet(wallet_dir_a.path()).await?; // let balance_a = wallet_a.balance().as_nano(); -// let amount = balance_a / 2; +// let amount = NanoTokens::from(balance_a / 2); // // Send from A -> B // let wallet_dir_b = TempDir::new()?; @@ -428,12 +452,10 @@ // let result = client.verify_cashnote(&cash_notes_for_x[0]).await; // info!("Got result while verifying double spend from A -> X: {result:?}"); -// // sleep for a bit to allow the network to process and accumulate the double spend -// tokio::time::sleep(Duration::from_secs(10)).await; - -// assert_matches!(result, Err(WalletError::CouldNotVerifyTransfer(str)) => { -// assert!(str.starts_with("Network Error Double spend(s) attempt was detected"), "Expected double spend, but got {str}"); -// }); // poisoned +// assert!( +// format!("{result:?}").starts_with("Err(UnexpectedParentSpends"), +// "Should have been UnexpectedParentSpends error, was: {result:?}" +// ); // // Try to double spend from B -> Y // let wallet_dir_y = TempDir::new()?; @@ -470,32 +492,48 @@ // let result = client.verify_cashnote(&cash_notes_for_y[0]).await; // info!("Got result while verifying double spend from B -> Y: {result:?}"); -// assert_matches!(result, Err(WalletError::CouldNotVerifyTransfer(str)) => { -// assert!(str.starts_with("Network Error Double spend(s) attempt was detected"), "Expected double spend, but got {str}"); -// }); +// assert_eq!( +// format!("{result:?}"), +// format!("Err({:?})", WalletError::BurntSpend), +// "Should have been BurntSpent error, was: {result:?}" +// ); // info!("Verifying the original cashnote of A -> B"); + +// // arbitrary time sleep to allow for network accumulation of double spend. +// tokio::time::sleep(Duration::from_secs(1)).await; + // let result = client.verify_cashnote(&cash_notes_for_b[0]).await; // info!("Got result while verifying the original spend from A -> B: {result:?}"); -// assert_matches!(result, Err(WalletError::CouldNotVerifyTransfer(str)) => { -// assert!(str.starts_with("Network Error Double spend(s) attempt was detected"), "Expected double spend, but got {str}"); -// }); +// assert_eq!( +// format!("{result:?}"), +// format!("Err({:?})", WalletError::BurntSpend), +// "Should have been BurntSpent error, was: {result:?}" +// ); + +// println!("Verifying the original cashnote of B -> C"); -// info!("Verifying the original cashnote of B -> C"); // let result = client.verify_cashnote(&cash_notes_for_c[0]).await; // info!("Got result while verifying the original spend from B -> C: {result:?}"); -// assert_matches!(result, Err(WalletError::CouldNotVerifyTransfer(str)) => { -// assert!(str.starts_with("Network Error Double spend(s) attempt was detected"), "Expected double spend, but got {str}"); -// }, "result should be verify error, it was {result:?}"); +// assert_eq!( +// format!("{result:?}"), +// format!("Err({:?})", WalletError::BurntSpend), +// "Should have been BurntSpent error, was: {result:?}" +// ); // let result = client.verify_cashnote(&cash_notes_for_y[0]).await; -// assert_matches!(result, Err(WalletError::CouldNotVerifyTransfer(str)) => { -// assert!(str.starts_with("Network Error Double spend(s) attempt was detected"), "Expected double spend, but got {str}"); -// }, "result should be verify error, it was {result:?}"); +// assert_eq!( +// format!("{result:?}"), +// format!("Err({:?})", WalletError::BurntSpend), +// "Should have been BurntSpent error, was: {result:?}" +// ); + // let result = client.verify_cashnote(&cash_notes_for_b[0]).await; -// assert_matches!(result, Err(WalletError::CouldNotVerifyTransfer(str)) => { -// assert!(str.starts_with("Network Error Double spend(s) attempt was detected"), "Expected double spend, but got {str}"); -// }, "result should be verify error, it was {result:?}"); +// assert_eq!( +// format!("{result:?}"), +// format!("Err({:?})", WalletError::BurntSpend), +// "Should have been BurntSpent error, was: {result:?}" +// ); // Ok(()) // } @@ -511,8 +549,8 @@ // let wallet_dir_a = TempDir::new()?; // let (client, mut wallet_a) = get_client_and_funded_wallet(wallet_dir_a.path()).await?; -// let balance_a = wallet_a.balance(); -// let amount = balance_a / 2; +// let balance_a = wallet_a.balance().as_nano(); +// let amount = NanoTokens::from(balance_a / 2); // // Send from A -> B // let wallet_dir_b = TempDir::new()?; @@ -574,7 +612,7 @@ // )?; // client -// .send_spends(transfer_to_c.spends.iter(), false) +// .send_spends(transfer_to_c.spends.iter(), true) // .await?; // info!("Verifying the transfers from B -> C wallet..."); @@ -611,9 +649,10 @@ // let result = client.verify_cashnote(&cash_notes_for_x[0]).await; // info!("Got result while verifying double spend from A -> X: {result:?}"); -// assert_matches!(result, Err(WalletError::CouldNotVerifyTransfer(str)) => { -// assert!(str.starts_with("Network Error Double spend(s) attempt was detected"), "Expected double spend, but got {str}"); -// }); +// assert_eq!( +// format!("{result:?}"), +// format!("Err({:?})", WalletError::BurntSpend) +// ); // // the original A should still be present as one of the double spends // let res = client @@ -649,20 +688,23 @@ // reason.clone(), // wallet_a.key(), // )?; // reuse the old cash notes -// client -// .send_spends(transfer_to_y.spends.iter(), false) -// .await?; + +// // we actually don't care about the result here, we just want to spam the network with double spends +// let _ = client.send_spends(transfer_to_y.spends.iter(), false).await; + +// // and then we verify the double spend attempt // info!("Verifying the transfers from A -> Y wallet... It should error out."); // let cash_notes_for_y: Vec<_> = transfer_to_y.output_cashnotes.clone(); // // sleep for a bit to allow the network to process and accumulate the double spend -// tokio::time::sleep(Duration::from_millis(500)).await; +// tokio::time::sleep(Duration::from_millis(1500)).await; // let result = client.verify_cashnote(&cash_notes_for_y[0]).await; // info!("Got result while verifying double spend from A -> Y: {result:?}"); -// assert_matches!(result, Err(WalletError::CouldNotVerifyTransfer(str)) => { -// assert!(str.starts_with("Network Error Double spend(s) attempt was detected"), "Expected double spend, but got {str}"); -// }); +// assert_eq!( +// format!("{result:?}"), +// format!("Err({:?})", WalletError::BurntSpend) +// ); // // the original A should still be present as one of the double spends // let res = client diff --git a/sn_node/tests/storage_payments.rs b/sn_node/tests/storage_payments.rs index 23fe9c53b0..d36f680ca2 100644 --- a/sn_node/tests/storage_payments.rs +++ b/sn_node/tests/storage_payments.rs @@ -14,7 +14,6 @@ // use libp2p::PeerId; // use rand::Rng; // use sn_client::{Error as ClientError, FilesDownload, Uploader, WalletClient}; -// use sn_evm::{Amount, AttoTokens, PaymentQuote}; // use sn_logging::LogBuilder; // use sn_networking::{GetRecordError, NetworkError}; // use sn_protocol::{ @@ -23,6 +22,7 @@ // NetworkAddress, // }; // use sn_registers::Permissions; +// use sn_transfers::{MainPubkey, NanoTokens, PaymentQuote}; // use std::collections::BTreeMap; // use tokio::time::{sleep, Duration}; // use tracing::info; @@ -80,7 +80,7 @@ // let mut wallet_client = WalletClient::new(client.clone(), paying_wallet); // let subset_len = chunks.len() / 3; -// let _storage_cost = wallet_client +// let res = wallet_client // .pay_for_storage( // chunks // .clone() @@ -88,7 +88,15 @@ // .take(subset_len) // .map(|(name, _)| NetworkAddress::ChunkAddress(ChunkAddress::new(name))), // ) -// .await?; +// .await; + +// // if the payment failed, we can log that +// if let Err(error) = res { +// tracing::warn!( +// "Payment failed, (though that doesn't really break this test): {:?}", +// error +// ); +// } // // now let's request to upload all addresses, even that we've already paid for a subset of them // let verify_store = false; @@ -111,7 +119,7 @@ // let paying_wallet_dir: TempDir = TempDir::new()?; // let (client, paying_wallet) = get_client_and_funded_wallet(paying_wallet_dir.path()).await?; -// let wallet_original_balance = paying_wallet.balance().as_atto(); +// let wallet_original_balance = paying_wallet.balance().as_nano(); // let mut wallet_client = WalletClient::new(client.clone(), paying_wallet); // // generate a random number (between 50 and 100) of random addresses @@ -135,10 +143,10 @@ // .ok_or(eyre!("Total storage cost exceed possible token amount"))?; // // check we've paid only for the subset of addresses, 1 nano per addr -// let new_balance = AttoTokens::from_atto(wallet_original_balance - total_cost.as_atto()); +// let new_balance = NanoTokens::from(wallet_original_balance - total_cost.as_nano()); // info!("Verifying new balance on paying wallet is {new_balance} ..."); // let paying_wallet = wallet_client.into_wallet(); -// // assert_eq!(paying_wallet.balance(), new_balance);// TODO adapt to evm +// assert_eq!(paying_wallet.balance(), new_balance); // // let's verify payment proofs for the subset have been cached in the wallet // assert!(random_content_addrs @@ -160,13 +168,12 @@ // .ok_or(eyre!("Total storage cost exceed possible token amount"))?; // // check we've paid only for addresses we haven't previously paid for, 1 nano per addr -// let new_balance = AttoTokens::from_atto( -// wallet_original_balance - (Amount::from(random_content_addrs.len()) * total_cost.as_atto()), +// let new_balance = NanoTokens::from( +// wallet_original_balance - (random_content_addrs.len() as u64 * total_cost.as_nano()), // ); // println!("Verifying new balance on paying wallet is now {new_balance} ..."); // let paying_wallet = wallet_client.into_wallet(); -// // TODO adapt to evm -// // assert_eq!(paying_wallet.balance(), new_balance); +// assert_eq!(paying_wallet.balance(), new_balance); // // let's verify payment proofs now for all addresses have been cached in the wallet // // assert!(random_content_addrs @@ -229,18 +236,16 @@ // no_data_payments.insert( // *chunk_name, // ( -// sn_evm::utils::dummy_address(), -// PaymentQuote::test_dummy(*chunk_name, AttoTokens::from_u64(0)), +// MainPubkey::new(bls::SecretKey::random().public_key()), +// PaymentQuote::test_dummy(*chunk_name, NanoTokens::from(0)), // PeerId::random().to_bytes(), // ), // ); // } -// // TODO adapt to evm -// // let _ = wallet_client -// // .mut_wallet() -// // .send_storage_payment(&no_data_payments) -// // .await?; +// let _ = wallet_client +// .mut_wallet() +// .local_send_storage_payment(&no_data_payments)?; // sleep(Duration::from_secs(5)).await; @@ -248,131 +253,131 @@ // .upload_test_bytes(content_bytes.clone(), false) // .await?; -// info!("Reading {content_addr:?} expected to fail"); -// let mut files_download = FilesDownload::new(files_api); -// assert!( -// matches!( -// files_download.download_file(content_addr, None).await, -// Err(ClientError::Network(NetworkError::GetRecordError( -// GetRecordError::RecordNotFound -// ))) -// ), -// "read bytes should fail as we didn't store them" -// ); +// // info!("Reading {content_addr:?} expected to fail"); +// // let mut files_download = FilesDownload::new(files_api); +// // assert!( +// // matches!( +// // files_download.download_file(content_addr, None).await, +// // Err(ClientError::Network(NetworkError::GetRecordError( +// // GetRecordError::RecordNotFound +// // ))) +// // ), +// // "read bytes should fail as we didn't store them" +// // ); -// Ok(()) -// } +// // Ok(()) +// // } -// #[tokio::test] -// async fn storage_payment_register_creation_succeeds() -> Result<()> { -// let _log_guards = LogBuilder::init_single_threaded_tokio_test("storage_payments", true); +// // #[tokio::test] +// // async fn storage_payment_register_creation_succeeds() -> Result<()> { +// // let _log_guards = LogBuilder::init_single_threaded_tokio_test("storage_payments", true); -// let paying_wallet_dir = TempDir::new()?; +// // let paying_wallet_dir = TempDir::new()?; -// let (client, paying_wallet) = get_client_and_funded_wallet(paying_wallet_dir.path()).await?; -// let mut wallet_client = WalletClient::new(client.clone(), paying_wallet); +// // let (client, paying_wallet) = get_client_and_funded_wallet(paying_wallet_dir.path()).await?; +// // let mut wallet_client = WalletClient::new(client.clone(), paying_wallet); -// let mut rng = rand::thread_rng(); -// let xor_name = XorName::random(&mut rng); -// let address = RegisterAddress::new(xor_name, client.signer_pk()); -// let net_addr = NetworkAddress::from_register_address(address); -// info!("Paying for random Register address {net_addr:?} ..."); +// // let mut rng = rand::thread_rng(); +// // let xor_name = XorName::random(&mut rng); +// // let address = RegisterAddress::new(xor_name, client.signer_pk()); +// // let net_addr = NetworkAddress::from_register_address(address); +// // info!("Paying for random Register address {net_addr:?} ..."); -// let _cost = wallet_client -// .pay_for_storage(std::iter::once(net_addr)) -// .await?; +// // let _cost = wallet_client +// // .pay_for_storage(std::iter::once(net_addr)) +// // .await?; -// let (mut register, _cost, _royalties_fees) = client -// .create_and_pay_for_register(xor_name, &mut wallet_client, true, Permissions::default()) -// .await?; +// // let (mut register, _cost, _royalties_fees) = client +// // .create_and_pay_for_register(xor_name, &mut wallet_client, true, Permissions::default()) +// // .await?; -// println!("Newly created register has {} ops", register.read().len()); +// // println!("Newly created register has {} ops", register.read().len()); -// let retrieved_reg = client.get_register(address).await?; +// // let retrieved_reg = client.get_register(address).await?; -// assert_eq!(register.read(), retrieved_reg.read()); +// // assert_eq!(register.read(), retrieved_reg.read()); -// let random_entry = rng.gen::<[u8; 32]>().to_vec(); +// // let random_entry = rng.gen::<[u8; 32]>().to_vec(); -// register.write(&random_entry)?; +// // register.write(&random_entry)?; -// println!( -// "Register has {} ops after first write", -// register.read().len() -// ); +// // println!( +// // "Register has {} ops after first write", +// // register.read().len() +// // ); -// register.sync(&mut wallet_client, true, None).await?; +// // register.sync(&mut wallet_client, true, None).await?; -// let retrieved_reg = client.get_register(address).await?; +// // let retrieved_reg = client.get_register(address).await?; -// assert_eq!(retrieved_reg.read().iter().next().unwrap().1, random_entry); +// // assert_eq!(retrieved_reg.read().iter().next().unwrap().1, random_entry); -// assert_eq!(retrieved_reg.read().len(), 1); +// // assert_eq!(retrieved_reg.read().len(), 1); -// for index in 1..10 { -// println!("current index is {index}"); -// let random_entry = rng.gen::<[u8; 32]>().to_vec(); +// // for index in 1..10 { +// // println!("current index is {index}"); +// // let random_entry = rng.gen::<[u8; 32]>().to_vec(); -// register.write(&random_entry)?; -// register.sync(&mut wallet_client, true, None).await?; +// // register.write(&random_entry)?; +// // register.sync(&mut wallet_client, true, None).await?; -// let retrieved_reg = client.get_register(address).await?; +// // let retrieved_reg = client.get_register(address).await?; -// println!( -// "current retrieved register entry length is {}", -// retrieved_reg.read().len() -// ); -// println!("current expected entry length is {}", register.read().len()); +// // println!( +// // "current retrieved register entry length is {}", +// // retrieved_reg.read().len() +// // ); +// // println!("current expected entry length is {}", register.read().len()); -// println!( -// "current retrieved register ops length is {}", -// retrieved_reg.ops.len() -// ); -// println!("current local cached ops length is {}", register.ops.len()); +// // println!( +// // "current retrieved register ops length is {}", +// // retrieved_reg.ops.len() +// // ); +// // println!("current local cached ops length is {}", register.ops.len()); -// assert_eq!(retrieved_reg.read().len(), register.read().len()); +// // assert_eq!(retrieved_reg.read().len(), register.read().len()); -// assert_eq!(retrieved_reg.read().iter().next().unwrap().1, random_entry); +// // assert_eq!(retrieved_reg.read().iter().next().unwrap().1, random_entry); -// println!("Current fetched register is {:?}", retrieved_reg.register); -// println!( -// "Fetched register has update history of {}", -// retrieved_reg.register.log_update_history() -// ); +// // println!("Current fetched register is {:?}", retrieved_reg.register); +// // println!( +// // "Fetched register has update history of {}", +// // retrieved_reg.register.log_update_history() +// // ); -// std::thread::sleep(std::time::Duration::from_millis(1000)); -// } +// // std::thread::sleep(std::time::Duration::from_millis(1000)); +// // } -// Ok(()) -// } +// // Ok(()) +// // } -// #[tokio::test] -// #[ignore = "Test currently invalid as we always try to pay and upload registers if none found... need to check if this test is valid"] -// async fn storage_payment_register_creation_and_mutation_fails() -> Result<()> { -// let _log_guards = LogBuilder::init_single_threaded_tokio_test("storage_payments", true); +// // #[tokio::test] +// // #[ignore = "Test currently invalid as we always try to pay and upload registers if none found... need to check if this test is valid"] +// // async fn storage_payment_register_creation_and_mutation_fails() -> Result<()> { +// // let _log_guards = LogBuilder::init_single_threaded_tokio_test("storage_payments", true); -// let paying_wallet_dir = TempDir::new()?; +// // let paying_wallet_dir = TempDir::new()?; -// let (client, paying_wallet) = get_client_and_funded_wallet(paying_wallet_dir.path()).await?; -// let mut wallet_client = WalletClient::new(client.clone(), paying_wallet); +// // let (client, paying_wallet) = get_client_and_funded_wallet(paying_wallet_dir.path()).await?; +// // let mut wallet_client = WalletClient::new(client.clone(), paying_wallet); -// let mut rng = rand::thread_rng(); -// let xor_name = XorName::random(&mut rng); -// let address = RegisterAddress::new(xor_name, client.signer_pk()); -// let net_address = -// NetworkAddress::RegisterAddress(RegisterAddress::new(xor_name, client.signer_pk())); +// // let mut rng = rand::thread_rng(); +// // let xor_name = XorName::random(&mut rng); +// // let address = RegisterAddress::new(xor_name, client.signer_pk()); +// // let net_address = +// // NetworkAddress::RegisterAddress(RegisterAddress::new(xor_name, client.signer_pk())); -// let mut no_data_payments = BTreeMap::default(); -// no_data_payments.insert( -// net_address -// .as_xorname() -// .expect("RegisterAddress should convert to XorName"), -// ( -// sn_evm::utils::dummy_address(), -// PaymentQuote::test_dummy(xor_name, AttoTokens::from_u64(0)), -// vec![], -// ), -// ); +// // let mut no_data_payments = BTreeMap::default(); +// // no_data_payments.insert( +// // net_address +// // .as_xorname() +// // .expect("RegisterAddress should convert to XorName"), +// // ( +// // sn_evm::utils::dummy_address(), +// // PaymentQuote::test_dummy(xor_name, AttoTokens::from_u64(0)), +// // vec![], +// // ), +// // ); // println!( // "current retrieved register entry length is {}", @@ -395,16 +400,16 @@ // // .send_storage_payment(&no_data_payments) // // .await?; -// // this should fail to store as the amount paid is not enough -// let (mut register, _cost, _royalties_fees) = client -// .create_and_pay_for_register(xor_name, &mut wallet_client, false, Permissions::default()) -// .await?; +// // // this should fail to store as the amount paid is not enough +// // let (mut register, _cost, _royalties_fees) = client +// // .create_and_pay_for_register(xor_name, &mut wallet_client, false, Permissions::default()) +// // .await?; -// sleep(Duration::from_secs(5)).await; -// assert!(matches!( -// client.get_register(address).await, -// Err(ClientError::Protocol(ProtocolError::RegisterNotFound(addr))) if *addr == address -// )); +// // sleep(Duration::from_secs(5)).await; +// // assert!(matches!( +// // client.get_register(address).await, +// // Err(ClientError::Protocol(ProtocolError::RegisterNotFound(addr))) if *addr == address +// // )); // println!("Current fetched register is {:?}", retrieved_reg.address()); // println!( @@ -415,11 +420,11 @@ // let random_entry = rng.gen::<[u8; 32]>().to_vec(); // register.write(&random_entry)?; -// sleep(Duration::from_secs(5)).await; -// assert!(matches!( -// register.sync(&mut wallet_client, false, None).await, -// Err(ClientError::Protocol(ProtocolError::RegisterNotFound(addr))) if *addr == address -// )); +// // sleep(Duration::from_secs(5)).await; +// // assert!(matches!( +// // register.sync(&mut wallet_client, false, None).await, +// // Err(ClientError::Protocol(ProtocolError::RegisterNotFound(addr))) if *addr == address +// // )); -// Ok(()) -// } +// // Ok(()) +// // } diff --git a/sn_node/tests/verify_data_location.rs b/sn_node/tests/verify_data_location.rs index 641756fa2c..8649d07909 100644 --- a/sn_node/tests/verify_data_location.rs +++ b/sn_node/tests/verify_data_location.rs @@ -16,13 +16,10 @@ use common::{ get_all_peer_ids, get_safenode_rpc_client, NodeRestart, }; use eyre::{eyre, Result}; -use libp2p::{ - kad::{KBucketKey, RecordKey}, - PeerId, -}; +use libp2p::{kad::RecordKey, PeerId}; use rand::{rngs::OsRng, Rng}; use sn_logging::LogBuilder; -use sn_networking::{sleep, sort_peers_by_key}; +use sn_networking::{sleep, sort_peers_by_address_and_limit, sort_peers_by_key_and_limit}; use sn_protocol::{ safenode_proto::{NodeInfoRequest, RecordAddressesRequest}, NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, @@ -160,8 +157,8 @@ fn print_node_close_groups(all_peers: &[PeerId]) { for (node_index, peer) in all_peers.iter().enumerate() { let key = NetworkAddress::from_peer(*peer).as_kbucket_key(); - let closest_peers = - sort_peers_by_key(&all_peers, &key, CLOSE_GROUP_SIZE).expect("failed to sort peer"); + let closest_peers = sort_peers_by_key_and_limit(&all_peers, &key, CLOSE_GROUP_SIZE) + .expect("failed to sort peer"); let closest_peers_idx = closest_peers .iter() .map(|&&peer| { @@ -212,11 +209,12 @@ async fn verify_location(all_peers: &Vec, node_rpc_addresses: &[SocketAd for (key, actual_holders_idx) in record_holders.iter() { println!("Verifying {:?}", PrettyPrintRecordKey::from(key)); info!("Verifying {:?}", PrettyPrintRecordKey::from(key)); - let record_key = KBucketKey::from(key.to_vec()); - let expected_holders = sort_peers_by_key(all_peers, &record_key, CLOSE_GROUP_SIZE)? - .into_iter() - .cloned() - .collect::>(); + let record_address = NetworkAddress::from_record_key(key); + let expected_holders = + sort_peers_by_address_and_limit(all_peers, &record_address, CLOSE_GROUP_SIZE)? + .into_iter() + .cloned() + .collect::>(); let actual_holders = actual_holders_idx .iter() diff --git a/sn_node/tests/verify_routing_table.rs b/sn_node/tests/verify_routing_table.rs index da19270b69..85dc2e3a09 100644 --- a/sn_node/tests/verify_routing_table.rs +++ b/sn_node/tests/verify_routing_table.rs @@ -26,7 +26,7 @@ use tracing::{error, info, trace}; /// Sleep for sometime for the nodes for discover each other before verification /// Also can be set through the env variable of the same name. -const SLEEP_BEFORE_VERIFICATION: Duration = Duration::from_secs(5); +const SLEEP_BEFORE_VERIFICATION: Duration = Duration::from_secs(60); #[tokio::test(flavor = "multi_thread")] async fn verify_routing_table() -> Result<()> { diff --git a/sn_protocol/src/error.rs b/sn_protocol/src/error.rs index f73c356b53..8462ff85f3 100644 --- a/sn_protocol/src/error.rs +++ b/sn_protocol/src/error.rs @@ -78,4 +78,7 @@ pub enum Error { // The record already exists at this node #[error("The record already exists, so do not charge for it: {0:?}")] RecordExists(PrettyPrintRecordKey<'static>), + + #[error("Record header is incorrect")] + IncorrectRecordHeader, } diff --git a/sn_protocol/src/storage.rs b/sn_protocol/src/storage.rs index 2935e43fce..3a6b4ba6a8 100644 --- a/sn_protocol/src/storage.rs +++ b/sn_protocol/src/storage.rs @@ -18,7 +18,10 @@ use std::{str::FromStr, time::Duration}; pub use self::{ address::{ChunkAddress, RegisterAddress, ScratchpadAddress, SpendAddress}, chunks::Chunk, - header::{try_deserialize_record, try_serialize_record, RecordHeader, RecordKind, RecordType}, + header::{ + get_type_from_record, try_deserialize_record, try_serialize_record, RecordHeader, + RecordKind, RecordType, + }, scratchpad::Scratchpad, }; diff --git a/sn_protocol/src/storage/header.rs b/sn_protocol/src/storage/header.rs index 96a4515526..af43c21256 100644 --- a/sn_protocol/src/storage/header.rs +++ b/sn_protocol/src/storage/header.rs @@ -84,6 +84,33 @@ impl Display for RecordKind { } } +/// Return the RecordType +pub fn get_type_from_record(record: &Record) -> Result { + let key = record.key.clone(); + let record_key = PrettyPrintRecordKey::from(&key); + + match RecordHeader::from_record(record) { + Ok(record_header) => match record_header.kind { + RecordKind::Chunk => Ok(RecordType::Chunk), + RecordKind::Scratchpad => Ok(RecordType::Scratchpad), + RecordKind::Spend | RecordKind::Register => { + let content_hash = XorName::from_content(&record.value); + Ok(RecordType::NonChunk(content_hash)) + } + RecordKind::ChunkWithPayment + | RecordKind::RegisterWithPayment + | RecordKind::ScratchpadWithPayment => { + error!("Record {record_key:?} with payment shall not be stored locally."); + Err(Error::IncorrectRecordHeader) + } + }, + Err(err) => { + error!("For record {record_key:?}, failed to parse record_header {err:?}"); + Err(Error::IncorrectRecordHeader) + } + } +} + impl RecordHeader { pub const SIZE: usize = 2; diff --git a/sn_transfers/src/wallet/error.rs b/sn_transfers/src/wallet/error.rs index 5a57b7434a..f60b718f42 100644 --- a/sn_transfers/src/wallet/error.rs +++ b/sn_transfers/src/wallet/error.rs @@ -40,9 +40,19 @@ pub enum Error { /// A general error when receiving a transfer fails #[error("Failed to receive transfer due to {0}")] CouldNotReceiveMoney(String), + /// A spend has been burnt (ie there was a DoubleSpendAttempt) + #[error("Failed to verify transfer validity in the network, a burnt SpendAttempt was found")] + BurntSpend, + /// Parents of a spend were not as expected in a provided cash note + #[error("Failed to verify transfer's parents in the network, transfer could be invalid or a parent double spent")] + UnexpectedParentSpends(crate::SpendAddress), + ///No valid unspent cashnotes found + #[error("All the redeemed CashNotes are already spent")] + AllRedeemedCashnotesSpent, /// A general error when verifying a transfer validity in the network #[error("Failed to verify transfer validity in the network {0}")] CouldNotVerifyTransfer(String), + /// Failed to fetch spend from network #[error("Failed to fetch spend from network: {0}")] FailedToGetSpend(String),