From 92c0a31ddf3353fd94c6b401acbe5ebb9760f671 Mon Sep 17 00:00:00 2001 From: qima Date: Mon, 4 Nov 2024 23:56:36 +0800 Subject: [PATCH] feat(node): derive encrypt_details from self keypair --- .github/workflows/merge.yml | 2 +- .github/workflows/nightly.yml | 2 +- Cargo.lock | 3 + sn_networking/Cargo.toml | 3 + sn_networking/src/driver.rs | 9 ++ sn_networking/src/record_store.rs | 184 ++++++++++++++++++++++++------ 6 files changed, 169 insertions(+), 34 deletions(-) diff --git a/.github/workflows/merge.yml b/.github/workflows/merge.yml index 9142383db4..47741d4ad4 100644 --- a/.github/workflows/merge.yml +++ b/.github/workflows/merge.yml @@ -120,7 +120,7 @@ jobs: - name: Run network tests timeout-minutes: 25 - run: cargo test --release --package sn_networking --features="open-metrics" + run: cargo test --release --package sn_networking --features="open-metrics, encrypt-records" can_store_after_restart - name: Run protocol tests timeout-minutes: 25 diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index 843507abff..a1e0ef2046 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -250,7 +250,7 @@ jobs: - name: Run network tests timeout-minutes: 25 - run: cargo test --release --package sn_networking --features="open-metrics" + run: cargo test --release --package sn_networking --features="open-metrics, encrypt-records" - name: Run protocol tests timeout-minutes: 25 diff --git a/Cargo.lock b/Cargo.lock index 3417d842b1..dcbc426bd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8330,6 +8330,7 @@ name = "sn_networking" version = "0.19.2-rc.2" dependencies = [ "aes-gcm-siv", + "assert_fs", "async-trait", "backoff", "blsttc", @@ -8339,6 +8340,7 @@ dependencies = [ "futures", "getrandom 0.2.15", "hex 0.4.3", + "hkdf", "hyper 0.14.30", "itertools 0.12.1", "lazy_static", @@ -8351,6 +8353,7 @@ dependencies = [ "rmp-serde", "self_encryption", "serde", + "sha2 0.10.8", "sn_build_info", "sn_evm", "sn_protocol", diff --git a/sn_networking/Cargo.toml b/sn_networking/Cargo.toml index 2c4fa90806..df71cf51a3 100644 --- a/sn_networking/Cargo.toml +++ b/sn_networking/Cargo.toml @@ -73,11 +73,14 @@ tracing = { version = "~0.1.26" } xor_name = "5.0.0" backoff = { version = "0.4.0", features = ["tokio"] } aes-gcm-siv = "0.11.1" +hkdf = "0.12" +sha2 = "0.10" walkdir = "~2.5.0" strum = { version = "0.26.2", features = ["derive"] } void = "1.0.2" [dev-dependencies] +assert_fs = "1.0.0" bls = { package = "blsttc", version = "8.0.1" } # add rand to libp2p libp2p-identity = { version = "0.2.7", features = ["rand"] } diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 1e52687741..2fdc2129ec 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -60,6 +60,7 @@ use sn_protocol::{ use sn_registers::SignedRegister; use std::{ collections::{btree_map::Entry, BTreeMap, HashMap, HashSet}, + convert::TryInto, fmt::Debug, fs, io::{Read, Write}, @@ -389,10 +390,18 @@ impl NetworkBuilder { source: error, }); } + let peer_id = PeerId::from(self.keypair.public()); + let encryption_seed: [u8; 16] = peer_id + .to_bytes() + .get(..16) + .expect("Cann't get encryption_seed from keypair") + .try_into() + .expect("Cann't get 16 bytes from serialised key_pair"); NodeRecordStoreConfig { max_value_bytes: MAX_PACKET_SIZE, // TODO, does this need to be _less_ than MAX_PACKET_SIZE storage_dir: storage_dir_path, historic_quote_dir: root_dir.clone(), + encryption_seed, ..Default::default() } }; diff --git a/sn_networking/src/record_store.rs b/sn_networking/src/record_store.rs index 149b11030a..78052a7f25 100644 --- a/sn_networking/src/record_store.rs +++ b/sn_networking/src/record_store.rs @@ -13,10 +13,10 @@ use crate::send_local_swarm_cmd; use crate::target_arch::{spawn, Instant}; use crate::{event::NetworkEvent, log_markers::Marker}; use aes_gcm_siv::{ - aead::{Aead, KeyInit, OsRng}, - Aes256GcmSiv, Nonce, + aead::{Aead, KeyInit}, + Aes256GcmSiv, Key as AesKey, Nonce, }; - +use hkdf::Hkdf; use itertools::Itertools; use libp2p::{ identity::PeerId, @@ -27,9 +27,9 @@ use libp2p::{ }; #[cfg(feature = "open-metrics")] use prometheus_client::metrics::gauge::Gauge; -use rand::RngCore; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use serde::{Deserialize, Serialize}; +use sha2::Sha256; use sn_evm::{AttoTokens, QuotingMetrics}; use sn_protocol::{ storage::{RecordHeader, RecordKind, RecordType}, @@ -67,6 +67,27 @@ const MAX_STORE_COST: u64 = 1_000_000; // Min store cost for a chunk. const MIN_STORE_COST: u64 = 1; +fn derive_aes256gcm_siv_from_seed(seed: &[u8; 16]) -> (Aes256GcmSiv, [u8; 4]) { + // shall be unique for purpose. + let salt = b"autonomi_record_store"; + + let hk = Hkdf::::new(Some(salt), seed); + + let mut okm = [0u8; 32]; + hk.expand(b"", &mut okm) + .expect("32 bytes is a valid length for HKDF output"); + + let seeded_key = AesKey::::from_slice(&okm); + + let mut nonce_starter = [0u8; 4]; + let bytes_to_copy = seed.len().min(nonce_starter.len()); + nonce_starter[..bytes_to_copy].copy_from_slice(&seed[..bytes_to_copy]); + + println!("seeded_key is {seeded_key:?} nonce_starter is {nonce_starter:?}"); + + (Aes256GcmSiv::new(seeded_key), nonce_starter) +} + /// FIFO simple cache of records to reduce read times struct RecordCache { records_cache: HashMap, @@ -163,6 +184,8 @@ pub struct NodeRecordStoreConfig { pub max_value_bytes: usize, /// The maximum number of records to cache in memory. pub records_cache_size: usize, + /// The seed to generate record_store encryption_details + pub encryption_seed: [u8; 16], } impl Default for NodeRecordStoreConfig { @@ -174,6 +197,7 @@ impl Default for NodeRecordStoreConfig { max_records: MAX_RECORDS_COUNT, max_value_bytes: MAX_PACKET_SIZE, records_cache_size: MAX_RECORDS_CACHE_SIZE, + encryption_seed: [0u8; 16], } } } @@ -203,18 +227,18 @@ impl NodeRecordStore { let process_entry = |entry: &DirEntry| -> _ { let path = entry.path(); if path.is_file() { - debug!("Existing record found: {path:?}"); + println!("Existing record found: {path:?}"); // if we've got a file, lets try and read it let filename = match path.file_name().and_then(|n| n.to_str()) { Some(file_name) => file_name, None => { // warn and remove this file as it's not a valid record - warn!( + println!( "Found a file in the storage dir that is not a valid record: {:?}", path ); if let Err(e) = fs::remove_file(path) { - warn!( + println!( "Failed to remove invalid record file from storage dir: {:?}", e ); @@ -224,19 +248,23 @@ impl NodeRecordStore { }; // get the record key from the filename let key = Self::get_data_from_filename(filename)?; + let address = NetworkAddress::from_record_key(&key); let record = match fs::read(path) { Ok(bytes) => { // and the stored record if let Some(record) = Self::get_record_from_bytes(bytes, &key, encryption_details) { + println!("Record recovered from local file: {address:?}"); record } else { // This will be due to node restart, result in different encrypt_detail. // Hence need to clean up the old copy. - info!("Failed to decrypt record from file {filename:?}, clean it up."); + println!( + "Failed to decrypt record from file {filename:?}, clean it up." + ); if let Err(e) = fs::remove_file(path) { - warn!( + println!( "Failed to remove outdated record file {filename:?} from storage dir: {:?}", e ); @@ -245,7 +273,7 @@ impl NodeRecordStore { } } Err(err) => { - error!("Error while reading file. filename: {filename}, error: {err:?}"); + println!("Error while reading file. filename: {filename}, error: {err:?}"); return None; } }; @@ -257,14 +285,14 @@ impl NodeRecordStore { RecordType::NonChunk(xorname_hash) } Err(error) => { - warn!( + println!( "Failed to parse record type of record {filename:?}: {:?}", error ); // In correct decryption using different key could result in this. // In that case, a cleanup shall be carried out. if let Err(e) = fs::remove_file(path) { - warn!( + println!( "Failed to remove invalid record file {filename:?} from storage dir: {:?}", e ); @@ -273,14 +301,13 @@ impl NodeRecordStore { } }; - let address = NetworkAddress::from_record_key(&key); - info!("Existing record loaded: {path:?}"); + println!("Existing record loaded: {path:?}"); return Some((key, (address, record_type))); } None }; - info!("Attempting to repopulate records from existing store..."); + println!("Attempting to repopulate records from existing store {:?} ...", config.storage_dir); let records = WalkDir::new(&config.storage_dir) .into_iter() .filter_map(|e| e.ok()) @@ -330,12 +357,8 @@ impl NodeRecordStore { network_event_sender: mpsc::Sender, swarm_cmd_sender: mpsc::Sender, ) -> Self { - let key = Aes256GcmSiv::generate_key(&mut OsRng); - let cipher = Aes256GcmSiv::new(&key); - let mut nonce_starter = [0u8; 4]; - OsRng.fill_bytes(&mut nonce_starter); - - let encryption_details = (cipher, nonce_starter); + println!("Using encryption_seed of {:?}", config.encryption_seed); + let encryption_details = derive_aes256gcm_siv_from_seed(&config.encryption_seed); // Recover the quoting_metrics first, as the historical file will be cleaned by // the later on update_records_from_an_existing_store function @@ -411,6 +434,7 @@ impl NodeRecordStore { key: &Key, encryption_details: &(Aes256GcmSiv, [u8; 4]), ) -> Option> { + println!("read back value is {bytes:?}"); let mut record = Record { key: key.clone(), value: bytes, @@ -426,13 +450,16 @@ impl NodeRecordStore { let (cipher, nonce_starter) = encryption_details; let nonce = generate_nonce_for_record(nonce_starter, key); + println!("Generated nonce for decrypt {nonce:?}"); + match cipher.decrypt(&nonce, record.value.as_ref()) { Ok(value) => { + println!("Decrypted value {value:?}"); record.value = value; return Some(Cow::Owned(record)); } Err(error) => { - error!("Error while decrypting record. key: {key:?}: {error:?}"); + println!("Error while decrypting record. key: {key:?}: {error:?}"); None } } @@ -452,7 +479,7 @@ impl NodeRecordStore { match fs::read(file_path) { Ok(bytes) => { // vdash metric (if modified please notify at https://github.com/happybeing/vdash/issues): - info!( + println!( "Retrieved record from disk! filename: {filename} after {:?}", start.elapsed() ); @@ -460,7 +487,7 @@ impl NodeRecordStore { Self::get_record_from_bytes(bytes, key, encryption_details) } Err(err) => { - error!("Error while reading file. filename: {filename}, error: {err:?}"); + println!("Error while reading file. filename: {filename}, error: {err:?}"); None } } @@ -622,13 +649,18 @@ impl NodeRecordStore { return Some(record.value); } + println!("Value to encrypt: {:?}", record.value); let (cipher, nonce_starter) = encryption_details; let nonce = generate_nonce_for_record(&nonce_starter, &record.key); + println!("Generated nonce for encrypt {nonce:?}"); match cipher.encrypt(&nonce, record.value.as_ref()) { - Ok(value) => Some(value), + Ok(value) => { + println!("encrypted value {value:?}"); + Some(value) + } Err(error) => { - warn!( + println!( "Failed to encrypt record {:?} : {error:?}", PrettyPrintRecordKey::from(&record.key), ); @@ -645,7 +677,7 @@ impl NodeRecordStore { pub(crate) fn put_verified(&mut self, r: Record, record_type: RecordType) -> Result<()> { let key = &r.key; let record_key = PrettyPrintRecordKey::from(&r.key).into_owned(); - debug!("PUTting a verified Record: {record_key:?}"); + println!("PUTting a verified Record: {record_key:?}"); // if cache already has the record : // * if with same content, do nothing and return early @@ -684,12 +716,12 @@ impl NodeRecordStore { let cmd = match fs::write(&file_path, bytes) { Ok(_) => { // vdash metric (if modified please notify at https://github.com/happybeing/vdash/issues): - info!("Wrote record {record_key2:?} to disk! filename: {filename}"); + println!("Wrote record {record_key2:?} to disk {file_path:?}!"); LocalSwarmCmd::AddLocalRecordAsStored { key, record_type } } Err(err) => { - error!( + println!( "Error writing record {record_key2:?} filename: {filename}, error: {err:?}" ); LocalSwarmCmd::RemoveFailedLocalRecord { key } @@ -789,11 +821,11 @@ impl RecordStore for NodeRecordStore { } if !self.records.contains_key(k) { - debug!("Record not found locally: {key:?}"); + println!("Record not found locally: {key:?}"); return None; } - debug!("GET request for Record key: {key}"); + println!("GET request for Record key: {key}"); Self::read_from_disk(&self.encryption_details, k, &self.config.storage_dir) } @@ -1021,6 +1053,7 @@ mod tests { use bls::SecretKey; use xor_name::XorName; + use assert_fs::TempDir; use bytes::Bytes; use eyre::{bail, ContextCompat}; use libp2p::kad::K_VALUE; @@ -1221,6 +1254,93 @@ mod tests { assert!(store.get(&r.key).is_none()); } + #[tokio::test] + async fn can_store_after_restart() { + let temp_dir = TempDir::new().expect("Should be able to create a temp dir."); + let store_config = NodeRecordStoreConfig { + storage_dir: temp_dir.to_path_buf(), + encryption_seed: [1u8; 16], + ..Default::default() + }; + let self_id = PeerId::random(); + let (network_event_sender, _) = mpsc::channel(1); + let (swarm_cmd_sender, _) = mpsc::channel(1); + + let mut store = NodeRecordStore::with_config( + self_id, + store_config.clone(), + network_event_sender.clone(), + swarm_cmd_sender.clone(), + ); + + // Create a chunk + let chunk_data = Bytes::from_static(b"Test chunk data"); + let chunk = Chunk::new(chunk_data.clone()); + let chunk_address = *chunk.address(); + + // Create a record from the chunk + let record = Record { + key: NetworkAddress::ChunkAddress(chunk_address).to_record_key(), + value: chunk_data.to_vec(), + expires: None, + publisher: None, + }; + + // Store the chunk using put_verified + assert!(store + .put_verified(record.clone(), RecordType::Chunk) + .is_ok()); + + // Mark as stored (simulating the CompletedWrite event) + store.mark_as_stored(record.key.clone(), RecordType::Chunk); + + // Verify the chunk is stored + let stored_record = store.get(&record.key); + assert!(stored_record.is_some(), "Chunk should be stored"); + + // Sleep a while to let OS completes the flush to disk + sleep(Duration::from_secs(10)).await; + + // Restart the store with same encrypt_seed + drop(store); + let store = NodeRecordStore::with_config( + self_id, + store_config, + network_event_sender.clone(), + swarm_cmd_sender.clone(), + ); + + // Sleep a lit bit to let OS completes restoring + sleep(Duration::from_secs(1)).await; + + // Verify the record still exists + let stored_record = store.get(&record.key); + assert!(stored_record.is_some(), "Chunk should be stored"); + + // Restart the store with different encrypt_seed + let self_id_diff = PeerId::random(); + let store_config_diff = NodeRecordStoreConfig { + storage_dir: temp_dir.to_path_buf(), + encryption_seed: [2u8; 16], + ..Default::default() + }; + let store_diff = NodeRecordStore::with_config( + self_id_diff, + store_config_diff, + network_event_sender, + swarm_cmd_sender, + ); + + // Sleep a lit bit to let OS completes restoring (if has) + sleep(Duration::from_secs(1)).await; + + // Verify the record is gone + assert!( + store_diff.get(&record.key).is_none(), + "Chunk should be gone" + ); + } + #[tokio::test] async fn can_store_and_retrieve_chunk() { let temp_dir = std::env::temp_dir(); @@ -1247,7 +1367,7 @@ mod tests { // Create a record from the chunk let record = Record { key: NetworkAddress::ChunkAddress(chunk_address).to_record_key(), - value: chunk_data.to_vec(), + value: try_serialize_record(&chunk, RecordKind::Chunk)?.to_vec(), expires: None, publisher: None, };