diff --git a/.changelog/unreleased/improvements/1977-replay-protection-storage.md b/.changelog/unreleased/improvements/1977-replay-protection-storage.md new file mode 100644 index 0000000000..0686adca5f --- /dev/null +++ b/.changelog/unreleased/improvements/1977-replay-protection-storage.md @@ -0,0 +1,2 @@ +- Reduced the storage consumption of replay protection. + ([\#1977](https://github.com/anoma/namada/pull/1977)) \ No newline at end of file diff --git a/apps/src/lib/node/ledger/shell/finalize_block.rs b/apps/src/lib/node/ledger/shell/finalize_block.rs index 53252065f1..2d14e3134d 100644 --- a/apps/src/lib/node/ledger/shell/finalize_block.rs +++ b/apps/src/lib/node/ledger/shell/finalize_block.rs @@ -8,10 +8,11 @@ use namada::ledger::events::EventType; use namada::ledger::gas::{GasMetering, TxGasMeter}; use namada::ledger::parameters::storage as params_storage; use namada::ledger::pos::{namada_proof_of_stake, staking_token_address}; +use namada::ledger::storage::wl_storage::WriteLogAndStorage; use namada::ledger::storage::EPOCH_SWITCH_BLOCKS_DELAY; use namada::ledger::storage_api::token::credit_tokens; use namada::ledger::storage_api::{pgf, StorageRead, StorageWrite}; -use namada::ledger::{inflation, protocol, replay_protection}; +use namada::ledger::{inflation, protocol}; use namada::proof_of_stake::{ delegator_rewards_products_handle, find_validator_by_raw_hash, read_last_block_proposer_address, read_pos_params, read_total_stake, @@ -87,6 +88,14 @@ where self.wl_storage.storage.update_epoch_blocks_delay ); + // Finalize the transactions' hashes from the previous block + for hash in self.wl_storage.storage.iter_replay_protection() { + self.wl_storage + .write_log + .finalize_tx_hash(hash) + .expect("Failed tx hashes finalization") + } + if new_epoch { namada::ledger::storage::update_allowed_conversions( &mut self.wl_storage, @@ -203,24 +212,17 @@ where tx_event["gas_used"] = "0".into(); response.events.push(tx_event); // if the rejected tx was decrypted, remove it - // from the queue of txs to be processed and remove the hash - // from storage + // from the queue of txs to be processed, remove its hash + // from storage and write the hash of the corresponding wrapper if let TxType::Decrypted(_) = &tx_header.tx_type { - let tx_hash = self + let wrapper_tx = self .wl_storage .storage .tx_queue .pop() .expect("Missing wrapper tx in queue") - .tx - .clone() - .update_header(TxType::Raw) - .header_hash(); - let tx_hash_key = - replay_protection::get_replay_protection_key(&tx_hash); - self.wl_storage - .delete(&tx_hash_key) - .expect("Error while deleting tx hash from storage"); + .tx; + self.allow_tx_replay(wrapper_tx); } #[cfg(not(any(feature = "abciplus", feature = "abcipp")))] @@ -276,7 +278,7 @@ where continue; } - let (mut tx_event, tx_unsigned_hash, mut tx_gas_meter, wrapper) = + let (mut tx_event, embedding_wrapper, mut tx_gas_meter, wrapper) = match &tx_header.tx_type { TxType::Wrapper(wrapper) => { stats.increment_wrapper_txs(); @@ -286,7 +288,7 @@ where } TxType::Decrypted(inner) => { // We remove the corresponding wrapper tx from the queue - let mut tx_in_queue = self + let tx_in_queue = self .wl_storage .storage .tx_queue @@ -323,12 +325,7 @@ where ( event, - Some( - tx_in_queue - .tx - .update_header(TxType::Raw) - .header_hash(), - ), + Some(tx_in_queue.tx), TxGasMeter::new_from_sub_limit(tx_in_queue.gas), None, ) @@ -511,18 +508,19 @@ where // If transaction type is Decrypted and failed because of // out of gas, remove its hash from storage to allow // rewrapping it - if let Some(hash) = tx_unsigned_hash { + if let Some(wrapper) = embedding_wrapper { if let Error::TxApply(protocol::Error::GasError(_)) = msg { - let tx_hash_key = - replay_protection::get_replay_protection_key( - &hash, - ); - self.wl_storage.delete(&tx_hash_key).expect( - "Error while deleting tx hash key from storage", - ); + self.allow_tx_replay(wrapper); } + } else if let Some(wrapper) = wrapper { + // If transaction type was Wrapper and failed, write its + // hash to storage to prevent + // replay + self.wl_storage + .write_tx_hash(wrapper.header_hash()) + .expect("Error while writing tx hash to storage"); } tx_event["gas_used"] = @@ -960,6 +958,18 @@ where } Ok(()) } + + // Allow to replay a specific wasm transaction. Needs as argument the + // corresponding wrapper transaction to avoid replay of that in the process + fn allow_tx_replay(&mut self, mut wrapper_tx: Tx) { + self.wl_storage + .write_tx_hash(wrapper_tx.header_hash()) + .expect("Error while deleting tx hash from storage"); + + self.wl_storage + .delete_tx_hash(wrapper_tx.update_header(TxType::Raw).header_hash()) + .expect("Error while deleting tx hash from storage"); + } } /// Convert ABCI vote info to PoS vote info. Any info which fails the conversion @@ -1045,6 +1055,7 @@ mod test_finalize_block { use namada::core::ledger::governance::storage::vote::{ StorageProposalVote, VoteType, }; + use namada::core::ledger::replay_protection; use namada::eth_bridge::storage::bridge_pool::{ self, get_key_from_hash, get_nonce_key, get_signed_root_key, }; @@ -2269,15 +2280,13 @@ mod test_finalize_block { let (wrapper_tx, processed_tx) = mk_wrapper_tx(&shell, &crate::wallet::defaults::albert_keypair()); - let wrapper_hash_key = replay_protection::get_replay_protection_key( - &wrapper_tx.header_hash(), - ); let mut decrypted_tx = wrapper_tx; decrypted_tx.update_header(TxType::Raw); - let decrypted_hash_key = replay_protection::get_replay_protection_key( - &decrypted_tx.header_hash(), - ); + let decrypted_hash_key = + replay_protection::get_replay_protection_last_key( + &decrypted_tx.header_hash(), + ); // merkle tree root before finalize_block let root_pre = shell.shell.wl_storage.storage.block.tree.root(); @@ -2303,20 +2312,16 @@ mod test_finalize_block { let root_post = shell.shell.wl_storage.storage.block.tree.root(); assert_eq!(root_pre.0, root_post.0); - // Check transactions' hashes in storage - assert!(shell.shell.wl_storage.has_key(&wrapper_hash_key).unwrap()); - assert!(shell.shell.wl_storage.has_key(&decrypted_hash_key).unwrap()); - // Check that non of the hashes is present in the merkle tree + // Check transaction's hash in storage assert!( - !shell + shell .shell .wl_storage - .storage - .block - .tree - .has_key(&wrapper_hash_key) - .unwrap() + .write_log + .has_replay_protection_entry(&decrypted_tx.header_hash()) + .unwrap_or_default() ); + // Check that the hash is present in the merkle tree assert!( !shell .shell @@ -2329,152 +2334,180 @@ mod test_finalize_block { ); } - /// Test that if a decrypted transaction fails because of out-of-gas, its - /// hash is removed from storage to allow rewrapping it + /// Test replay protection hash handling #[test] - fn test_remove_tx_hash() { + fn test_tx_hash_handling() { let (mut shell, _, _, _) = setup(); let keypair = gen_keypair(); + let mut batch = + namada::core::ledger::storage::testing::TestStorage::batch(); - let mut wasm_path = top_level_directory(); - wasm_path.push("wasm_for_tests/tx_no_op.wasm"); - let tx_code = std::fs::read(wasm_path) - .expect("Expected a file at given code path"); - let mut wrapper_tx = + let (wrapper_tx, _) = mk_wrapper_tx(&shell, &keypair); + let (wrapper_tx_2, _) = mk_wrapper_tx(&shell, &keypair); + let mut invalid_wrapper_tx = Tx::from_type(TxType::Wrapper(Box::new(WrapperTx::new( Fee { - amount_per_gas_unit: Amount::zero(), + amount_per_gas_unit: 0.into(), token: shell.wl_storage.storage.native_token.clone(), }, keypair.ref_to(), Epoch(0), - GAS_LIMIT_MULTIPLIER.into(), + 0.into(), None, )))); - wrapper_tx.header.chain_id = shell.chain_id.clone(); - wrapper_tx.set_code(Code::new(tx_code)); - wrapper_tx.set_data(Data::new( + invalid_wrapper_tx.header.chain_id = shell.chain_id.clone(); + invalid_wrapper_tx + .set_code(Code::new("wasm_code".as_bytes().to_owned())); + invalid_wrapper_tx.set_data(Data::new( "Encrypted transaction data".as_bytes().to_owned(), )); + invalid_wrapper_tx.add_section(Section::Signature(Signature::new( + invalid_wrapper_tx.sechashes(), + [(0, keypair)].into_iter().collect(), + None, + ))); + + let wrapper_hash = wrapper_tx.header_hash(); + let wrapper_2_hash = wrapper_tx_2.header_hash(); + let invalid_wrapper_hash = invalid_wrapper_tx.header_hash(); let mut decrypted_tx = wrapper_tx.clone(); + let mut decrypted_tx_2 = wrapper_tx_2.clone(); decrypted_tx.update_header(TxType::Decrypted(DecryptedTx::Decrypted)); + decrypted_tx_2.update_header(TxType::Decrypted(DecryptedTx::Decrypted)); + let decrypted_hash = + wrapper_tx.clone().update_header(TxType::Raw).header_hash(); + let decrypted_2_hash = wrapper_tx_2 + .clone() + .update_header(TxType::Raw) + .header_hash(); + let decrypted_3_hash = invalid_wrapper_tx + .clone() + .update_header(TxType::Raw) + .header_hash(); + + // Write inner hashes in storage + for hash in [&decrypted_hash, &decrypted_2_hash] { + let hash_subkey = + replay_protection::get_replay_protection_last_subkey(hash); + shell + .wl_storage + .storage + .write_replay_protection_entry(&mut batch, &hash_subkey) + .expect("Test failed"); + } - // Write inner hash in storage - let inner_hash_key = replay_protection::get_replay_protection_key( - &wrapper_tx.clone().update_header(TxType::Raw).header_hash(), - ); - shell - .wl_storage - .storage - .write(&inner_hash_key, vec![]) - .expect("Test failed"); - - let processed_tx = ProcessedTx { + // Invalid wrapper tx that should lead to a commitment of the wrapper + // hash and no commitment of the inner hash + let mut processed_txs = vec![ProcessedTx { + tx: invalid_wrapper_tx.to_bytes(), + result: TxResult { + code: ErrorCodes::Ok.into(), + info: "".into(), + }, + }]; + // Out of gas error triggering inner hash removal and wrapper hash + // insert + processed_txs.push(ProcessedTx { tx: decrypted_tx.to_bytes(), result: TxResult { code: ErrorCodes::Ok.into(), info: "".into(), }, - }; + }); + // Wasm error that still leads to inner hash commitment and no wrapper + // hash insert + processed_txs.push(ProcessedTx { + tx: decrypted_tx_2.to_bytes(), + result: TxResult { + code: ErrorCodes::Ok.into(), + info: "".into(), + }, + }); shell.enqueue_tx(wrapper_tx, Gas::default()); + shell.enqueue_tx(wrapper_tx_2, GAS_LIMIT_MULTIPLIER.into()); // merkle tree root before finalize_block let root_pre = shell.shell.wl_storage.storage.block.tree.root(); let event = &shell .finalize_block(FinalizeBlock { - txs: vec![processed_tx], + txs: processed_txs, ..Default::default() }) - .expect("Test failed")[0]; + .expect("Test failed"); // the merkle tree root should not change after finalize_block let root_post = shell.shell.wl_storage.storage.block.tree.root(); assert_eq!(root_pre.0, root_post.0); - // Check inner tx hash has been removed from storage - assert_eq!(event.event_type.to_string(), String::from("applied")); - let code = event.attributes.get("code").expect("Testfailed").as_str(); + // Check first inner tx hash has been removed from storage but + // corresponding wrapper hash is still there Check second inner + // tx is still there and corresponding wrapper hash has been removed + // since useless + assert_eq!(event[0].event_type.to_string(), String::from("accepted")); + let code = event[0] + .attributes + .get("code") + .expect("Test failed") + .as_str(); + assert_eq!(code, String::from(ErrorCodes::InvalidTx).as_str()); + assert_eq!(event[1].event_type.to_string(), String::from("applied")); + let code = event[1] + .attributes + .get("code") + .expect("Test failed") + .as_str(); + assert_eq!(code, String::from(ErrorCodes::WasmRuntimeError).as_str()); + assert_eq!(event[2].event_type.to_string(), String::from("applied")); + let code = event[2] + .attributes + .get("code") + .expect("Test failed") + .as_str(); assert_eq!(code, String::from(ErrorCodes::WasmRuntimeError).as_str()); + assert!( + shell + .wl_storage + .write_log + .has_replay_protection_entry(&invalid_wrapper_hash) + .unwrap_or_default() + ); assert!( !shell .wl_storage - .has_key(&inner_hash_key) - .expect("Test failed") - ) - } - - #[test] - /// Test that the hash of the wrapper transaction is committed to storage - /// even if the wrapper tx fails. The inner transaction hash must instead be - /// removed - fn test_commits_hash_if_wrapper_failure() { - let (mut shell, _, _, _) = setup(); - let keypair = gen_keypair(); - - let mut wrapper = - Tx::from_type(TxType::Wrapper(Box::new(WrapperTx::new( - Fee { - amount_per_gas_unit: 0.into(), - token: shell.wl_storage.storage.native_token.clone(), - }, - keypair.ref_to(), - Epoch(0), - 0.into(), - None, - )))); - wrapper.header.chain_id = shell.chain_id.clone(); - wrapper.set_code(Code::new("wasm_code".as_bytes().to_owned())); - wrapper.set_data(Data::new( - "Encrypted transaction data".as_bytes().to_owned(), - )); - wrapper.add_section(Section::Signature(Signature::new( - wrapper.sechashes(), - [(0, keypair)].into_iter().collect(), - None, - ))); - - let wrapper_hash_key = replay_protection::get_replay_protection_key( - &wrapper.header_hash(), + .write_log + .has_replay_protection_entry(&decrypted_3_hash) + .unwrap_or_default() ); - let inner_hash_key = replay_protection::get_replay_protection_key( - &wrapper.clone().update_header(TxType::Raw).header_hash(), + assert!( + !shell + .wl_storage + .write_log + .has_replay_protection_entry(&decrypted_hash) + .unwrap_or_default() + ); + assert!( + shell + .wl_storage + .write_log + .has_replay_protection_entry(&wrapper_hash) + .unwrap_or_default() ); - - let processed_tx = ProcessedTx { - tx: wrapper.to_bytes(), - result: TxResult { - code: ErrorCodes::Ok.into(), - info: "".into(), - }, - }; - - let event = &shell - .finalize_block(FinalizeBlock { - txs: vec![processed_tx], - ..Default::default() - }) - .expect("Test failed")[0]; - - // Check wrapper hash has been committed to storage even if it failed. - // Check that, instead, the inner hash has been removed - assert_eq!(event.event_type.to_string(), String::from("accepted")); - let code = event.attributes.get("code").expect("Testfailed").as_str(); - assert_eq!(code, String::from(ErrorCodes::InvalidTx).as_str()); - assert!( shell .wl_storage - .has_key(&wrapper_hash_key) - .expect("Test failed") + .storage + .has_replay_protection_entry(&decrypted_2_hash) + .expect("test failed") ); assert!( !shell .wl_storage - .has_key(&inner_hash_key) - .expect("Test failed") - ) + .write_log + .has_replay_protection_entry(&wrapper_2_hash) + .unwrap_or_default() + ); } // Test that if the fee payer doesn't have enough funds for fee payment the diff --git a/apps/src/lib/node/ledger/shell/mod.rs b/apps/src/lib/node/ledger/shell/mod.rs index a1c17fe450..5315f16d8d 100644 --- a/apps/src/lib/node/ledger/shell/mod.rs +++ b/apps/src/lib/node/ledger/shell/mod.rs @@ -42,13 +42,14 @@ use namada::ledger::protocol::{ apply_wasm_tx, get_fee_unshielding_transaction, get_transfer_hash_from_storage, ShellParams, }; +use namada::ledger::storage::wl_storage::WriteLogAndStorage; use namada::ledger::storage::write_log::WriteLog; use namada::ledger::storage::{ DBIter, Sha256Hasher, Storage, StorageHasher, TempWlStorage, WlStorage, DB, EPOCH_SWITCH_BLOCKS_DELAY, }; use namada::ledger::storage_api::{self, StorageRead}; -use namada::ledger::{parameters, pos, protocol, replay_protection}; +use namada::ledger::{parameters, pos, protocol}; use namada::proof_of_stake::{self, process_slashes, read_pos_params, slash}; use namada::proto::{self, Section, Tx}; use namada::types::address::Address; @@ -63,7 +64,7 @@ use namada::types::transaction::{ hash_tx, verify_decrypted_correctly, AffineCurve, DecryptedTx, EllipticCurve, PairingEngine, TxType, WrapperTx, }; -use namada::types::{address, hash, token}; +use namada::types::{address, token}; use namada::vm::wasm::{TxCache, VpCache}; use namada::vm::{WasmCacheAccess, WasmCacheRwAccess}; use num_derive::{FromPrimitive, ToPrimitive}; @@ -929,51 +930,40 @@ where pub fn replay_protection_checks( &self, wrapper: &Tx, - tx_bytes: &[u8], temp_wl_storage: &mut TempWlStorage<D, H>, ) -> Result<()> { - let inner_tx_hash = - wrapper.clone().update_header(TxType::Raw).header_hash(); - let inner_hash_key = - replay_protection::get_replay_protection_key(&inner_tx_hash); + let wrapper_hash = wrapper.header_hash(); if temp_wl_storage - .has_key(&inner_hash_key) - .expect("Error while checking inner tx hash key in storage") + .has_replay_protection_entry(&wrapper_hash) + .expect("Error while checking wrapper tx hash key in storage") { return Err(Error::ReplayAttempt(format!( - "Inner transaction hash {} already in storage", - &inner_tx_hash, + "Wrapper transaction hash {} already in storage", + wrapper_hash ))); } - // Write inner hash to tx WAL + // Write wrapper hash to tx WAL temp_wl_storage - .write_log - .write(&inner_hash_key, vec![]) - .expect("Couldn't write inner transaction hash to write log"); + .write_tx_hash(wrapper_hash) + .map_err(|e| Error::ReplayAttempt(e.to_string()))?; - let tx = - Tx::try_from(tx_bytes).expect("Deserialization shouldn't fail"); - let wrapper_hash = tx.header_hash(); - let wrapper_hash_key = - replay_protection::get_replay_protection_key(&wrapper_hash); + let inner_tx_hash = + wrapper.clone().update_header(TxType::Raw).header_hash(); if temp_wl_storage - .has_key(&wrapper_hash_key) - .expect("Error while checking wrapper tx hash key in storage") + .has_replay_protection_entry(&inner_tx_hash) + .expect("Error while checking inner tx hash key in storage") { return Err(Error::ReplayAttempt(format!( - "Wrapper transaction hash {} already in storage", - wrapper_hash + "Inner transaction hash {} already in storage", + &inner_tx_hash, ))); } - // Write wrapper hash to tx WAL + // Write inner hash to tx WAL temp_wl_storage - .write_log - .write(&wrapper_hash_key, vec![]) - .expect("Couldn't write wrapper tx hash to write log"); - - Ok(()) + .write_tx_hash(inner_tx_hash) + .map_err(|e| Error::ReplayAttempt(e.to_string())) } /// If a handle to an Ethereum oracle was provided to the [`Shell`], attempt @@ -1266,14 +1256,11 @@ where let mut inner_tx = tx; inner_tx.update_header(TxType::Raw); let inner_tx_hash = &inner_tx.header_hash(); - let inner_hash_key = - replay_protection::get_replay_protection_key(inner_tx_hash); if self .wl_storage .storage - .has_key(&inner_hash_key) + .has_replay_protection_entry(inner_tx_hash) .expect("Error while checking inner tx hash key in storage") - .0 { response.code = ErrorCodes::ReplayTx.into(); response.log = format!( @@ -1286,17 +1273,14 @@ where let tx = Tx::try_from(tx_bytes) .expect("Deserialization shouldn't fail"); - let wrapper_hash = hash::Hash(tx.header_hash().0); - let wrapper_hash_key = - replay_protection::get_replay_protection_key(&wrapper_hash); + let wrapper_hash = &tx.header_hash(); if self .wl_storage .storage - .has_key(&wrapper_hash_key) + .has_replay_protection_entry(wrapper_hash) .expect( "Error while checking wrapper tx hash key in storage", ) - .0 { response.code = ErrorCodes::ReplayTx.into(); response.log = format!( @@ -2323,6 +2307,7 @@ mod abciplus_mempool_tests { #[cfg(test)] mod tests { + use namada::ledger::replay_protection; use namada::proof_of_stake::Epoch; use namada::proto::{Code, Data, Section, Signature, Tx}; use namada::types::transaction::{Fee, WrapperTx}; @@ -2464,13 +2449,15 @@ mod tests { ))); // Write wrapper hash to storage + let mut batch = + namada::core::ledger::storage::testing::TestStorage::batch(); let wrapper_hash = wrapper.header_hash(); let wrapper_hash_key = - replay_protection::get_replay_protection_key(&wrapper_hash); + replay_protection::get_replay_protection_last_subkey(&wrapper_hash); shell .wl_storage .storage - .write(&wrapper_hash_key, wrapper_hash) + .write_replay_protection_entry(&mut batch, &wrapper_hash_key) .expect("Test failed"); // Try wrapper tx replay attack @@ -2506,11 +2493,13 @@ mod tests { wrapper.clone().update_header(TxType::Raw).header_hash(); // Write inner hash in storage let inner_hash_key = - replay_protection::get_replay_protection_key(&inner_tx_hash); + replay_protection::get_replay_protection_last_subkey( + &inner_tx_hash, + ); shell .wl_storage .storage - .write(&inner_hash_key, inner_tx_hash) + .write_replay_protection_entry(&mut batch, &inner_hash_key) .expect("Test failed"); // Try inner tx replay attack diff --git a/apps/src/lib/node/ledger/shell/prepare_proposal.rs b/apps/src/lib/node/ledger/shell/prepare_proposal.rs index 3687a6d39b..20b9105875 100644 --- a/apps/src/lib/node/ledger/shell/prepare_proposal.rs +++ b/apps/src/lib/node/ledger/shell/prepare_proposal.rs @@ -245,8 +245,11 @@ where let mut tx_gas_meter = TxGasMeter::new(wrapper.gas_limit); tx_gas_meter.add_tx_size_gas(tx_bytes).map_err(|_| ())?; - // Check replay protection - self.replay_protection_checks(&tx, tx_bytes, temp_wl_storage) + // Check replay protection, safe to do here. Even if the tx is a + // replay attempt, we can leave its hashes in the write log since, + // having we already checked the signature, no other tx with the + // same hash can ba deemed valid + self.replay_protection_checks(&tx, temp_wl_storage) .map_err(|_| ())?; // Check fees @@ -1188,7 +1191,7 @@ mod test_prepare_proposal { // Write wrapper hash to storage let wrapper_unsigned_hash = wrapper.header_hash(); - let hash_key = replay_protection::get_replay_protection_key( + let hash_key = replay_protection::get_replay_protection_last_key( &wrapper_unsigned_hash, ); shell @@ -1283,8 +1286,9 @@ mod test_prepare_proposal { wrapper.clone().update_header(TxType::Raw).header_hash(); // Write inner hash to storage - let hash_key = - replay_protection::get_replay_protection_key(&inner_unsigned_hash); + let hash_key = replay_protection::get_replay_protection_last_key( + &inner_unsigned_hash, + ); shell .wl_storage .storage @@ -1313,7 +1317,7 @@ mod test_prepare_proposal { let (shell, _recv, _, _) = test_utils::setup(); let keypair = crate::wallet::defaults::daewon_keypair(); - let keypair_2 = crate::wallet::defaults::daewon_keypair(); + let keypair_2 = crate::wallet::defaults::albert_keypair(); let mut wrapper = Tx::from_type(TxType::Wrapper(Box::new(WrapperTx::new( Fee { diff --git a/apps/src/lib/node/ledger/shell/process_proposal.rs b/apps/src/lib/node/ledger/shell/process_proposal.rs index ab544de3f8..6f1c7c83b9 100644 --- a/apps/src/lib/node/ledger/shell/process_proposal.rs +++ b/apps/src/lib/node/ledger/shell/process_proposal.rs @@ -878,11 +878,9 @@ where } } else { // Replay protection checks - if let Err(e) = self.replay_protection_checks( - &tx, - tx_bytes, - temp_wl_storage, - ) { + if let Err(e) = + self.replay_protection_checks(&tx, temp_wl_storage) + { return TxResult { code: ErrorCodes::ReplayTx.into(), info: e.to_string(), @@ -988,6 +986,7 @@ mod test_process_proposal { #[cfg(feature = "abcipp")] use assert_matches::assert_matches; + use namada::ledger::replay_protection; use namada::ledger::storage_api::StorageWrite; use namada::proto::{ Code, Data, Section, SignableEthMessage, Signature, Signed, @@ -2131,14 +2130,16 @@ mod test_process_proposal { ))); // Write wrapper hash to storage + let mut batch = + namada::core::ledger::storage::testing::TestStorage::batch(); let wrapper_unsigned_hash = wrapper.header_hash(); - let hash_key = replay_protection::get_replay_protection_key( + let hash_key = replay_protection::get_replay_protection_last_subkey( &wrapper_unsigned_hash, ); shell .wl_storage .storage - .write(&hash_key, vec![]) + .write_replay_protection_entry(&mut batch, &hash_key) .expect("Test failed"); // Run validation @@ -2224,12 +2225,9 @@ mod test_process_proposal { assert_eq!( response[1].result.info, format!( - "Transaction replay attempt: Inner transaction hash \ + "Transaction replay attempt: Wrapper transaction hash \ {} already in storage", - wrapper - .clone() - .update_header(TxType::Raw) - .header_hash(), + wrapper.header_hash(), ) ); } @@ -2267,12 +2265,15 @@ mod test_process_proposal { wrapper.clone().update_header(TxType::Raw).header_hash(); // Write inner hash to storage - let hash_key = - replay_protection::get_replay_protection_key(&inner_unsigned_hash); + let mut batch = + namada::core::ledger::storage::testing::TestStorage::batch(); + let hash_key = replay_protection::get_replay_protection_last_subkey( + &inner_unsigned_hash, + ); shell .wl_storage .storage - .write(&hash_key, vec![]) + .write_replay_protection_entry(&mut batch, &hash_key) .expect("Test failed"); // Run validation @@ -2305,7 +2306,7 @@ mod test_process_proposal { let (shell, _recv, _, _) = test_utils::setup(); let keypair = crate::wallet::defaults::daewon_keypair(); - let keypair_2 = crate::wallet::defaults::daewon_keypair(); + let keypair_2 = crate::wallet::defaults::albert_keypair(); let mut wrapper = Tx::from_type(TxType::Wrapper(Box::new(WrapperTx::new( diff --git a/apps/src/lib/node/ledger/storage/rocksdb.rs b/apps/src/lib/node/ledger/storage/rocksdb.rs index 61eb2c32e5..ee9a1c658b 100644 --- a/apps/src/lib/node/ledger/storage/rocksdb.rs +++ b/apps/src/lib/node/ledger/storage/rocksdb.rs @@ -12,6 +12,7 @@ //! epoch can start //! - `next_epoch_min_start_time`: minimum block time from which the next //! epoch can start +//! - `replay_protection`: hashes of the processed transactions //! - `pred`: predecessor values of the top-level keys of the same name //! - `tx_queue` //! - `next_epoch_min_start_height` @@ -32,6 +33,9 @@ //! - `epoch`: block epoch //! - `address_gen`: established address generator //! - `header`: block's header +//! - `replay_protection`: hashes of processed tx +//! - `all`: the hashes included up to the last block +//! - `last`: the hashes included in the last block use std::fs::File; use std::io::BufWriter; @@ -73,6 +77,7 @@ const SUBSPACE_CF: &str = "subspace"; const DIFFS_CF: &str = "diffs"; const STATE_CF: &str = "state"; const BLOCK_CF: &str = "block"; +const REPLAY_PROTECTION_CF: &str = "replay_protection"; /// RocksDB handle #[derive(Debug)] @@ -160,6 +165,21 @@ pub fn open( block_cf_opts.set_block_based_table_factory(&table_opts); cfs.push(ColumnFamilyDescriptor::new(BLOCK_CF, block_cf_opts)); + // for replay protection (read/insert-intensive) + let mut replay_protection_cf_opts = Options::default(); + replay_protection_cf_opts + .set_compression_type(rocksdb::DBCompressionType::Zstd); + replay_protection_cf_opts.set_compression_options(0, 0, 0, 1024 * 1024); + replay_protection_cf_opts.set_level_compaction_dynamic_level_bytes(true); + // Prioritize minimizing read amplification + replay_protection_cf_opts + .set_compaction_style(rocksdb::DBCompactionStyle::Level); + replay_protection_cf_opts.set_block_based_table_factory(&table_opts); + cfs.push(ColumnFamilyDescriptor::new( + REPLAY_PROTECTION_CF, + replay_protection_cf_opts, + )); + rocksdb::DB::open_cf_descriptors(&db_opts, path, cfs) .map(RocksDB) .map_err(|e| Error::DBError(e.into_string())) @@ -353,6 +373,21 @@ impl RocksDB { self.dump_it(cf, None, &mut file); } + // replay protection + // Dump of replay protection keys is possible only at the last height or + // the previous one + if height == last_height { + let cf = self + .get_column_family(REPLAY_PROTECTION_CF) + .expect("Replay protection column family should exist"); + self.dump_it(cf, None, &mut file); + } else if height == last_height - 1 { + let cf = self + .get_column_family(REPLAY_PROTECTION_CF) + .expect("Replay protection column family should exist"); + self.dump_it(cf, Some("all".to_string()), &mut file); + } + println!("Done writing to {}", full_path.to_string_lossy()); } @@ -449,6 +484,11 @@ impl RocksDB { tracing::info!("Removing last block results"); batch.delete_cf(block_cf, format!("results/{}", last_block.height)); + // Delete the tx hashes included in the last block + let reprot_cf = self.get_column_family(REPLAY_PROTECTION_CF)?; + tracing::info!("Removing replay protection hashes"); + batch.delete_cf(reprot_cf, "last"); + // Execute next step in parallel let batch = Mutex::new(batch); @@ -1055,6 +1095,30 @@ impl DB for RocksDB { Ok(Some((stored_height, merkle_tree_stores))) } + fn has_replay_protection_entry( + &self, + hash: &namada::types::hash::Hash, + ) -> Result<bool> { + let replay_protection_cf = + self.get_column_family(REPLAY_PROTECTION_CF)?; + + for prefix in ["last", "all"] { + let key = Key::parse(prefix) + .map_err(Error::KeyError)? + .push(&hash.to_string()) + .map_err(Error::KeyError)?; + if self + .0 + .get_pinned_cf(replay_protection_cf, key.to_string()) + .map_err(|e| Error::DBError(e.into_string()))? + .is_some() + { + return Ok(true); + } + } + Ok(false) + } + fn read_subspace_val(&self, key: &Key) -> Result<Option<Vec<u8>>> { let subspace_cf = self.get_column_family(SUBSPACE_CF)?; self.0 @@ -1341,6 +1405,34 @@ impl DB for RocksDB { None => Ok(()), } } + + fn write_replay_protection_entry( + &mut self, + batch: &mut Self::WriteBatch, + key: &Key, + ) -> Result<()> { + let replay_protection_cf = + self.get_column_family(REPLAY_PROTECTION_CF)?; + + batch + .0 + .put_cf(replay_protection_cf, key.to_string(), vec![]); + + Ok(()) + } + + fn delete_replay_protection_entry( + &mut self, + batch: &mut Self::WriteBatch, + key: &Key, + ) -> Result<()> { + let replay_protection_cf = + self.get_column_family(REPLAY_PROTECTION_CF)?; + + batch.0.delete_cf(replay_protection_cf, key.to_string()); + + Ok(()) + } } impl<'iter> DBIter<'iter> for RocksDB { @@ -1382,6 +1474,14 @@ impl<'iter> DBIter<'iter> for RocksDB { ) -> PersistentPrefixIterator<'iter> { iter_diffs_prefix(self, height, false) } + + fn iter_replay_protection(&'iter self) -> Self::PrefixIter { + let replay_protection_cf = self + .get_column_family(REPLAY_PROTECTION_CF) + .expect("{REPLAY_PROTECTION_CF} column family should exist"); + + iter_prefix(self, replay_protection_cf, "last".to_string(), None) + } } fn iter_subspace_prefix<'iter>( diff --git a/benches/native_vps.rs b/benches/native_vps.rs index 77373080c4..763c5f8dc7 100644 --- a/benches/native_vps.rs +++ b/benches/native_vps.rs @@ -23,7 +23,6 @@ use namada::ledger::gas::{TxGasMeter, VpGasMeter}; use namada::ledger::governance::GovernanceVp; use namada::ledger::native_vp::ibc::Ibc; use namada::ledger::native_vp::multitoken::MultitokenVp; -use namada::ledger::native_vp::replay_protection::ReplayProtectionVp; use namada::ledger::native_vp::{Ctx, NativeVp}; use namada::ledger::storage_api::StorageRead; use namada::proto::{Code, Section}; @@ -39,48 +38,6 @@ use namada_benches::{ TX_TRANSFER_WASM, TX_VOTE_PROPOSAL_WASM, }; -fn replay_protection(c: &mut Criterion) { - // Write a random key under the replay protection subspace - let tx = generate_foreign_key_tx(&defaults::albert_keypair()); - let mut shell = BenchShell::default(); - - shell.execute_tx(&tx); - let (verifiers, keys_changed) = shell - .wl_storage - .write_log - .verifiers_and_changed_keys(&BTreeSet::default()); - - let replay_protection = ReplayProtectionVp { - ctx: Ctx::new( - &Address::Internal(InternalAddress::ReplayProtection), - &shell.wl_storage.storage, - &shell.wl_storage.write_log, - &tx, - &TxIndex(0), - VpGasMeter::new_from_tx_meter(&TxGasMeter::new_from_sub_limit( - u64::MAX.into(), - )), - &keys_changed, - &verifiers, - shell.vp_wasm_cache.clone(), - ), - }; - - c.bench_function("vp_replay_protection", |b| { - b.iter(|| { - // NOTE: thiv VP will always fail when triggered so don't assert - // here - replay_protection - .validate_tx( - &tx, - replay_protection.ctx.keys_changed, - replay_protection.ctx.verifiers, - ) - .unwrap() - }) - }); -} - fn governance(c: &mut Criterion) { let mut group = c.benchmark_group("vp_governance"); @@ -476,7 +433,6 @@ fn vp_multitoken(c: &mut Criterion) { criterion_group!( native_vps, - replay_protection, governance, // slash_fund, ibc, diff --git a/core/src/ledger/replay_protection.rs b/core/src/ledger/replay_protection.rs index 56537dfbaf..71332d295c 100644 --- a/core/src/ledger/replay_protection.rs +++ b/core/src/ledger/replay_protection.rs @@ -1,21 +1,32 @@ //! Replay protection storage -use crate::types::address::{Address, InternalAddress}; use crate::types::hash::Hash; -use crate::types::storage::{DbKeySeg, Key, KeySeg}; +use crate::types::storage::Key; -/// Internal replay protection address -pub const ADDRESS: Address = - Address::Internal(InternalAddress::ReplayProtection); +const ERROR_MSG: &str = "Cannot obtain a valid db key"; -/// Check if a key is a replay protection key -pub fn is_replay_protection_key(key: &Key) -> bool { - matches!(&key.segments[0], DbKeySeg::AddressSeg(addr) if addr == &ADDRESS) +/// Get the transaction hash key under the `last` subkey +pub fn get_replay_protection_last_subkey(hash: &Hash) -> Key { + Key::parse("last") + .expect(ERROR_MSG) + .push(&hash.to_string()) + .expect(ERROR_MSG) +} + +/// Get the transaction hash key under the `all` subkey +pub fn get_replay_protection_all_subkey(hash: &Hash) -> Key { + Key::parse("all") + .expect(ERROR_MSG) + .push(&hash.to_string()) + .expect(ERROR_MSG) } -/// Get the transaction hash key -pub fn get_replay_protection_key(hash: &Hash) -> Key { - Key::from(ADDRESS.to_db_key()) +/// Get the full transaction hash key under the `last` subkey +pub fn get_replay_protection_last_key(hash: &Hash) -> Key { + Key::parse("replay_protection") + .expect(ERROR_MSG) + .push(&"last".to_string()) + .expect(ERROR_MSG) .push(&hash.to_string()) - .expect("Cannot obtain a valid db key") + .expect(ERROR_MSG) } diff --git a/core/src/ledger/storage/mockdb.rs b/core/src/ledger/storage/mockdb.rs index 971584e742..dba7257236 100644 --- a/core/src/ledger/storage/mockdb.rs +++ b/core/src/ledger/storage/mockdb.rs @@ -14,6 +14,7 @@ use super::{ }; use crate::ledger::storage::types::{self, KVBytes, PrefixIterator}; use crate::types::ethereum_structs; +use crate::types::hash::Hash; #[cfg(feature = "ferveo-tpke")] use crate::types::internal::TxQueue; use crate::types::storage::{ @@ -413,6 +414,24 @@ impl DB for MockDB { Ok(Some((height, merkle_tree_stores))) } + fn has_replay_protection_entry(&self, hash: &Hash) -> Result<bool> { + let prefix_key = + Key::parse("replay_protection").map_err(Error::KeyError)?; + for prefix in ["last", "all"] { + let key = prefix_key + .push(&prefix.to_string()) + .map_err(Error::KeyError)? + .push(&hash.to_string()) + .map_err(Error::KeyError)?; + + if self.0.borrow().contains_key(&key.to_string()) { + return Ok(true); + } + } + + Ok(false) + } + fn read_subspace_val(&self, key: &Key) -> Result<Option<Vec<u8>>> { let key = Key::parse("subspace").map_err(Error::KeyError)?.join(key); Ok(self.0.borrow().get(&key.to_string()).cloned()) @@ -540,6 +559,37 @@ impl DB for MockDB { None => Ok(()), } } + + fn write_replay_protection_entry( + &mut self, + _batch: &mut Self::WriteBatch, + key: &Key, + ) -> Result<()> { + let key = Key::parse("replay_protection") + .map_err(Error::KeyError)? + .join(key); + + match self.0.borrow_mut().insert(key.to_string(), vec![]) { + Some(_) => Err(Error::DBError(format!( + "Replay protection key {key} already in storage" + ))), + None => Ok(()), + } + } + + fn delete_replay_protection_entry( + &mut self, + _batch: &mut Self::WriteBatch, + key: &Key, + ) -> Result<()> { + let key = Key::parse("replay_protection") + .map_err(Error::KeyError)? + .join(key); + + self.0.borrow_mut().remove(&key.to_string()); + + Ok(()) + } } impl<'iter> DBIter<'iter> for MockDB { @@ -581,6 +631,18 @@ impl<'iter> DBIter<'iter> for MockDB { // Mock DB can read only the latest value for now unimplemented!() } + + fn iter_replay_protection(&'iter self) -> Self::PrefixIter { + let db_prefix = "replay_protection/".to_owned(); + let iter = self.0.borrow().clone().into_iter(); + MockPrefixIterator::new( + MockIterator { + prefix: "last".to_string(), + iter, + }, + db_prefix, + ) + } } /// A prefix iterator base for the [`MockPrefixIterator`]. diff --git a/core/src/ledger/storage/mod.rs b/core/src/ledger/storage/mod.rs index 81be7e48a6..f6c607cb35 100644 --- a/core/src/ledger/storage/mod.rs +++ b/core/src/ledger/storage/mod.rs @@ -27,7 +27,6 @@ pub use wl_storage::{ #[cfg(feature = "wasm-runtime")] pub use self::masp_conversions::update_allowed_conversions; pub use self::masp_conversions::{encode_asset_type, ConversionState}; -use super::replay_protection::is_replay_protection_key; use crate::ledger::eth_bridge::storage::bridge_pool::is_pending_transfer_key; use crate::ledger::gas::{ STORAGE_ACCESS_GAS_PER_BYTE, STORAGE_WRITE_GAS_PER_BYTE, @@ -286,6 +285,9 @@ pub trait DB: std::fmt::Debug { height: BlockHeight, ) -> Result<Option<(BlockHeight, MerkleTreeStoresRead)>>; + /// Check if the given replay protection entry exists + fn has_replay_protection_entry(&self, hash: &Hash) -> Result<bool>; + /// Read the latest value for account subspace key from the DB fn read_subspace_val(&self, key: &Key) -> Result<Option<Vec<u8>>>; @@ -353,6 +355,20 @@ pub trait DB: std::fmt::Debug { pruned_epoch: Epoch, pred_epochs: &Epochs, ) -> Result<()>; + + /// Write a replay protection entry + fn write_replay_protection_entry( + &mut self, + batch: &mut Self::WriteBatch, + key: &Key, + ) -> Result<()>; + + /// Delete a replay protection entry + fn delete_replay_protection_entry( + &mut self, + batch: &mut Self::WriteBatch, + key: &Key, + ) -> Result<()>; } /// A database prefix iterator. @@ -376,6 +392,9 @@ pub trait DBIter<'iter> { /// Read subspace new diffs at a given height fn iter_new_diffs(&'iter self, height: BlockHeight) -> Self::PrefixIter; + + /// Read replay protection storage from the last block + fn iter_replay_protection(&'iter self) -> Self::PrefixIter; } /// Atomic batch write. @@ -570,19 +589,10 @@ where /// Check if the given key is present in storage. Returns the result and the /// gas cost. pub fn has_key(&self, key: &Key) -> Result<(bool, u64)> { - if is_replay_protection_key(key) { - // Replay protection keys are not included in the merkle - // tree - Ok(( - self.db.read_subspace_val(key)?.is_some(), - key.len() as u64 * STORAGE_ACCESS_GAS_PER_BYTE, - )) - } else { - Ok(( - self.block.tree.has_key(key)?, - key.len() as u64 * STORAGE_ACCESS_GAS_PER_BYTE, - )) - } + Ok(( + self.block.tree.has_key(key)?, + key.len() as u64 * STORAGE_ACCESS_GAS_PER_BYTE, + )) } /// Returns a value from the specified subspace and the gas cost @@ -667,8 +677,8 @@ where let height = self.block.height.try_to_vec().expect("Encoding failed"); self.block.tree.update(key, height)?; - } else if !is_replay_protection_key(key) { - // Update the merkle tree for all but replay-protection entries + } else { + // Update the merkle tree self.block.tree.update(key, value)?; } @@ -686,9 +696,7 @@ where // but with gas and storage bytes len diff accounting let mut deleted_bytes_len = 0; if self.has_key(key)?.0 { - if !is_replay_protection_key(key) { - self.block.tree.delete(key)?; - } + self.block.tree.delete(key)?; deleted_bytes_len = self.db.delete_subspace_val(self.block.height, key)?; } @@ -797,44 +805,36 @@ where match old.0.cmp(&new.0) { Ordering::Equal => { // the value was updated - if !is_replay_protection_key(&new_key) { - tree.update( - &new_key, - if is_pending_transfer_key(&new_key) { - target_height.try_to_vec().expect( - "Serialization should never \ - fail", - ) - } else { - new.1.clone() - }, - )? - }; + tree.update( + &new_key, + if is_pending_transfer_key(&new_key) { + target_height.try_to_vec().expect( + "Serialization should never fail", + ) + } else { + new.1.clone() + }, + )?; old_diff = old_diff_iter.next(); new_diff = new_diff_iter.next(); } Ordering::Less => { // the value was deleted - if !is_replay_protection_key(&old_key) { - tree.delete(&old_key)?; - } + tree.delete(&old_key)?; old_diff = old_diff_iter.next(); } Ordering::Greater => { // the value was inserted - if !is_replay_protection_key(&new_key) { - tree.update( - &new_key, - if is_pending_transfer_key(&new_key) { - target_height.try_to_vec().expect( - "Serialization should never \ - fail", - ) - } else { - new.1.clone() - }, - )?; - } + tree.update( + &new_key, + if is_pending_transfer_key(&new_key) { + target_height.try_to_vec().expect( + "Serialization should never fail", + ) + } else { + new.1.clone() + }, + )?; new_diff = new_diff_iter.next(); } } @@ -843,9 +843,7 @@ where // the value was deleted let key = Key::parse(old.0.clone()) .expect("the key should be parsable"); - if !is_replay_protection_key(&key) { - tree.delete(&key)?; - } + tree.delete(&key)?; old_diff = old_diff_iter.next(); } (None, Some(new)) => { @@ -853,18 +851,16 @@ where let key = Key::parse(new.0.clone()) .expect("the key should be parsable"); - if !is_replay_protection_key(&key) { - tree.update( - &key, - if is_pending_transfer_key(&key) { - target_height.try_to_vec().expect( - "Serialization should never fail", - ) - } else { - new.1.clone() - }, - )? - }; + tree.update( + &key, + if is_pending_transfer_key(&key) { + target_height + .try_to_vec() + .expect("Serialization should never fail") + } else { + new.1.clone() + }, + )?; new_diff = new_diff_iter.next(); } (None, None) => break, @@ -1058,8 +1054,8 @@ where let height = self.block.height.try_to_vec().expect("Encoding failed"); self.block.tree.update(key, height)?; - } else if !is_replay_protection_key(key) { - // Update the merkle tree for all but replay-protection entries + } else { + // Update the merkle tree self.block.tree.update(key, value)?; } self.db @@ -1074,10 +1070,8 @@ where batch: &mut D::WriteBatch, key: &Key, ) -> Result<i64> { - if !is_replay_protection_key(key) { - // Update the merkle tree for all but replay-protection entries - self.block.tree.delete(key)?; - } + // Update the merkle tree + self.block.tree.delete(key)?; self.db .batch_delete_subspace_val(batch, self.block.height, key) } @@ -1121,6 +1115,42 @@ where .map(|b| b.height) .unwrap_or_default() } + + /// Check it the given transaction's hash is already present in storage + pub fn has_replay_protection_entry(&self, hash: &Hash) -> Result<bool> { + self.db.has_replay_protection_entry(hash) + } + + /// Write the provided tx hash to storage + pub fn write_replay_protection_entry( + &mut self, + batch: &mut D::WriteBatch, + key: &Key, + ) -> Result<()> { + self.db.write_replay_protection_entry(batch, key) + } + + /// Delete the provided tx hash from storage + pub fn delete_replay_protection_entry( + &mut self, + batch: &mut D::WriteBatch, + key: &Key, + ) -> Result<()> { + self.db.delete_replay_protection_entry(batch, key) + } + + /// Iterate the replay protection storage from the last block + pub fn iter_replay_protection( + &self, + ) -> Box<dyn Iterator<Item = Hash> + '_> { + Box::new(self.db.iter_replay_protection().map(|(key, _, _)| { + key.rsplit_once('/') + .expect("Missing tx hash in storage key") + .1 + .parse() + .expect("Failed hash conversion") + })) + } } impl From<MerkleTreeError> for Error { diff --git a/core/src/ledger/storage/wl_storage.rs b/core/src/ledger/storage/wl_storage.rs index 87107a35c9..d18262c4f3 100644 --- a/core/src/ledger/storage/wl_storage.rs +++ b/core/src/ledger/storage/wl_storage.rs @@ -9,6 +9,7 @@ use crate::ledger::storage::{DBIter, Storage, StorageHasher, DB}; use crate::ledger::storage_api::{ResultExt, StorageRead, StorageWrite}; use crate::ledger::{gas, parameters, storage_api}; use crate::types::address::Address; +use crate::types::hash::Hash; use crate::types::storage::{self, BlockHeight}; use crate::types::time::DateTimeUtc; @@ -55,6 +56,19 @@ where storage, } } + + /// Check if the given tx hash has already been processed + pub fn has_replay_protection_entry( + &self, + hash: &Hash, + ) -> Result<bool, super::Error> { + if let Some(present) = self.write_log.has_replay_protection_entry(hash) + { + return Ok(present); + } + + self.storage.has_replay_protection_entry(hash) + } } /// Common trait for [`WlStorage`] and [`TempWlStorage`], used to implement @@ -78,6 +92,12 @@ pub trait WriteLogAndStorage { /// reference to `WriteLog` when in need of both (avoids complain from the /// borrow checker) fn split_borrow(&mut self) -> (&mut WriteLog, &Storage<Self::D, Self::H>); + + /// Write the provided tx hash to storage. + fn write_tx_hash( + &mut self, + hash: Hash, + ) -> crate::ledger::storage::write_log::Result<()>; } impl<D, H> WriteLogAndStorage for WlStorage<D, H> @@ -103,6 +123,13 @@ where fn split_borrow(&mut self) -> (&mut WriteLog, &Storage<Self::D, Self::H>) { (&mut self.write_log, &self.storage) } + + fn write_tx_hash( + &mut self, + hash: Hash, + ) -> crate::ledger::storage::write_log::Result<()> { + self.write_log.write_tx_hash(hash) + } } impl<D, H> WriteLogAndStorage for TempWlStorage<'_, D, H> @@ -128,6 +155,13 @@ where fn split_borrow(&mut self) -> (&mut WriteLog, &Storage<Self::D, Self::H>) { (&mut self.write_log, (self.storage)) } + + fn write_tx_hash( + &mut self, + hash: Hash, + ) -> crate::ledger::storage::write_log::Result<()> { + self.write_log.write_tx_hash(hash) + } } impl<D, H> WlStorage<D, H> @@ -221,6 +255,14 @@ where } Ok(new_epoch) } + + /// Delete the provided transaction's hash from storage. + pub fn delete_tx_hash( + &mut self, + hash: Hash, + ) -> crate::ledger::storage::write_log::Result<()> { + self.write_log.delete_tx_hash(hash) + } } /// Prefix iterator for [`WlStorage`]. diff --git a/core/src/ledger/storage/write_log.rs b/core/src/ledger/storage/write_log.rs index 84edb8f56d..e563374146 100644 --- a/core/src/ledger/storage/write_log.rs +++ b/core/src/ledger/storage/write_log.rs @@ -10,6 +10,9 @@ use crate::ledger; use crate::ledger::gas::{ STORAGE_ACCESS_GAS_PER_BYTE, STORAGE_WRITE_GAS_PER_BYTE, }; +use crate::ledger::replay_protection::{ + get_replay_protection_all_subkey, get_replay_protection_last_subkey, +}; use crate::ledger::storage::traits::StorageHasher; use crate::ledger::storage::Storage; use crate::types::address::{Address, EstablishedAddressGen, InternalAddress}; @@ -36,6 +39,8 @@ pub enum Error { DeleteVp, #[error("Trying to write a temporary value after deleting")] WriteTempAfterDelete, + #[error("Replay protection key: {0}")] + ReplayProtection(String), } /// Result for functions that may fail @@ -66,6 +71,17 @@ pub enum StorageModification { }, } +#[derive(Debug, Clone)] +/// A replay protection storage modification +enum ReProtStorageModification { + /// Write an entry + Write, + /// Delete an entry + Delete, + /// Finalize an entry + Finalize, +} + /// The write log storage #[derive(Debug, Clone)] pub struct WriteLog { @@ -87,6 +103,9 @@ pub struct WriteLog { tx_precommit_write_log: HashMap<storage::Key, StorageModification>, /// The IBC events for the current transaction ibc_events: BTreeSet<IbcEvent>, + /// Storage modifications for the replay protection storage, always + /// committed regardless of the result of the transaction + replay_protection: HashMap<Hash, ReProtStorageModification>, } /// Write log prefix iterator @@ -113,6 +132,7 @@ impl Default for WriteLog { tx_write_log: HashMap::with_capacity(100), tx_precommit_write_log: HashMap::with_capacity(100), ibc_events: BTreeSet::new(), + replay_protection: HashMap::with_capacity(1_000), } } } @@ -511,10 +531,47 @@ impl WriteLog { StorageModification::Temp { .. } => {} } } + + for (hash, entry) in self.replay_protection.iter() { + match entry { + ReProtStorageModification::Write => storage + .write_replay_protection_entry( + batch, + // Can only write tx hashes to the previous block, no + // further + &get_replay_protection_last_subkey(hash), + ) + .map_err(Error::StorageError)?, + ReProtStorageModification::Delete => storage + .delete_replay_protection_entry( + batch, + // Can only delete tx hashes from the previous block, + // no further + &get_replay_protection_last_subkey(hash), + ) + .map_err(Error::StorageError)?, + ReProtStorageModification::Finalize => { + storage + .write_replay_protection_entry( + batch, + &get_replay_protection_all_subkey(hash), + ) + .map_err(Error::StorageError)?; + storage + .delete_replay_protection_entry( + batch, + &get_replay_protection_last_subkey(hash), + ) + .map_err(Error::StorageError)? + } + } + } + if let Some(address_gen) = self.address_gen.take() { storage.address_gen = address_gen } self.block_write_log.clear(); + self.replay_protection.clear(); Ok(()) } @@ -608,6 +665,71 @@ impl WriteLog { let iter = matches.into_iter(); PrefixIter { iter } } + + /// Check if the given tx hash has already been processed. Returns `None` if + /// the key is not known. + pub fn has_replay_protection_entry(&self, hash: &Hash) -> Option<bool> { + self.replay_protection + .get(hash) + .map(|action| !matches!(action, ReProtStorageModification::Delete)) + } + + /// Write the transaction hash + pub(crate) fn write_tx_hash(&mut self, hash: Hash) -> Result<()> { + if self + .replay_protection + .insert(hash, ReProtStorageModification::Write) + .is_some() + { + // Cannot write an hash if other requests have already been + // committed for the same hash + return Err(Error::ReplayProtection(format!( + "Requested a write on hash {hash} over a previous request" + ))); + } + + Ok(()) + } + + /// Remove the transaction hash + pub(crate) fn delete_tx_hash(&mut self, hash: Hash) -> Result<()> { + match self + .replay_protection + .insert(hash, ReProtStorageModification::Delete) + { + None => Ok(()), + // Allow overwriting a previous finalize request + Some(ReProtStorageModification::Finalize) => Ok(()), + Some(_) => + // Cannot delete an hash that still has to be written to + // storage or has already been deleted + { + Err(Error::ReplayProtection(format!( + "Requested a delete on hash {hash} not yet committed to \ + storage" + ))) + } + } + } + + /// Move the transaction hash of the previous block to the list of all + /// blocks. This functions should be called at the beginning of the block + /// processing, before any other replay protection operation is done + pub fn finalize_tx_hash(&mut self, hash: Hash) -> Result<()> { + if self + .replay_protection + .insert(hash, ReProtStorageModification::Finalize) + .is_some() + { + // Cannot finalize an hash if other requests have already been + // committed for the same hash + return Err(Error::ReplayProtection(format!( + "Requested a finalize on hash {hash} over a previous request" + ))); + } + + Ok(()) + } } #[cfg(test)] @@ -834,6 +956,98 @@ mod tests { assert_eq!(value, None); } + #[test] + fn test_replay_protection_commit() { + let mut storage = + crate::ledger::storage::testing::TestStorage::default(); + let mut write_log = WriteLog::default(); + let mut batch = crate::ledger::storage::testing::TestStorage::batch(); + + // write some replay protection keys + write_log + .write_tx_hash(Hash::sha256("tx1".as_bytes())) + .unwrap(); + write_log + .write_tx_hash(Hash::sha256("tx2".as_bytes())) + .unwrap(); + write_log + .write_tx_hash(Hash::sha256("tx3".as_bytes())) + .unwrap(); + + // commit a block + write_log + .commit_block(&mut storage, &mut batch) + .expect("commit failed"); + + assert!(write_log.replay_protection.is_empty()); + for tx in ["tx1", "tx2", "tx3"] { + assert!( + storage + .has_replay_protection_entry(&Hash::sha256(tx.as_bytes())) + .expect("read failed") + ); + } + + // write some replay protection keys + write_log + .write_tx_hash(Hash::sha256("tx4".as_bytes())) + .unwrap(); + write_log + .write_tx_hash(Hash::sha256("tx5".as_bytes())) + .unwrap(); + write_log + .write_tx_hash(Hash::sha256("tx6".as_bytes())) + .unwrap(); + + // delete previous hash + write_log + .delete_tx_hash(Hash::sha256("tx1".as_bytes())) + .unwrap(); + + // finalize previous hashes + for tx in ["tx2", "tx3"] { + write_log + .finalize_tx_hash(Hash::sha256(tx.as_bytes())) + .unwrap(); + } + + // commit a block + write_log + .commit_block(&mut storage, &mut batch) + .expect("commit failed"); + + assert!(write_log.replay_protection.is_empty()); + for tx in ["tx2", "tx3", "tx4", "tx5", "tx6"] { + assert!( + storage + .has_replay_protection_entry(&Hash::sha256(tx.as_bytes())) + .expect("read failed") + ); + } + assert!( + !storage + .has_replay_protection_entry(&Hash::sha256("tx1".as_bytes())) + .expect("read failed") + ); + + // try to delete finalized hash which shouldn't work + write_log + .delete_tx_hash(Hash::sha256("tx2".as_bytes())) + .unwrap(); + + // commit a block + write_log + .commit_block(&mut storage, &mut batch) + .expect("commit failed"); + + assert!(write_log.replay_protection.is_empty()); + assert!( + storage + .has_replay_protection_entry(&Hash::sha256("tx2".as_bytes())) + .expect("read failed") + ); + } + prop_compose! { fn arb_verifiers_changed_key_tx_all_key() (verifiers_from_tx in testing::arb_verifiers_from_tx()) diff --git a/core/src/types/address.rs b/core/src/types/address.rs index 416b3f059e..7fe431cb86 100644 --- a/core/src/types/address.rs +++ b/core/src/types/address.rs @@ -82,8 +82,6 @@ mod internal { "ano::ETH Bridge Address "; pub const ETH_BRIDGE_POOL: &str = "ano::ETH Bridge Pool Address "; - pub const REPLAY_PROTECTION: &str = - "ano::Replay Protection "; pub const MULTITOKEN: &str = "ano::Multitoken "; pub const PGF: &str = @@ -243,9 +241,6 @@ impl Address { eth_addr.to_canonical().replace("0x", ""); format!("{PREFIX_NUT}::{eth_addr}") } - InternalAddress::ReplayProtection => { - internal::REPLAY_PROTECTION.to_string() - } InternalAddress::Multitoken => { internal::MULTITOKEN.to_string() } @@ -329,9 +324,6 @@ impl Address { internal::ETH_BRIDGE_POOL => { Ok(Address::Internal(InternalAddress::EthBridgePool)) } - internal::REPLAY_PROTECTION => { - Ok(Address::Internal(InternalAddress::ReplayProtection)) - } internal::MULTITOKEN => { Ok(Address::Internal(InternalAddress::Multitoken)) } @@ -572,8 +564,6 @@ pub enum InternalAddress { Erc20(EthAddress), /// Non-usable ERC20 tokens Nut(EthAddress), - /// Replay protection contains transactions' hash - ReplayProtection, /// Multitoken Multitoken, /// Pgf @@ -596,7 +586,6 @@ impl Display for InternalAddress { Self::EthBridgePool => "EthBridgePool".to_string(), Self::Erc20(eth_addr) => format!("Erc20: {}", eth_addr), Self::Nut(eth_addr) => format!("Non-usable token: {eth_addr}"), - Self::ReplayProtection => "ReplayProtection".to_string(), Self::Multitoken => "Multitoken".to_string(), Self::Pgf => "PublicGoodFundings".to_string(), } @@ -892,7 +881,6 @@ pub mod testing { InternalAddress::EthBridgePool => {} InternalAddress::Erc20(_) => {} InternalAddress::Nut(_) => {} - InternalAddress::ReplayProtection => {} InternalAddress::Pgf => {} InternalAddress::Multitoken => {} /* Add new addresses in the * `prop_oneof` below. */ @@ -908,7 +896,6 @@ pub mod testing { Just(InternalAddress::EthBridgePool), Just(arb_erc20()), Just(arb_nut()), - Just(InternalAddress::ReplayProtection), Just(InternalAddress::Multitoken), Just(InternalAddress::Pgf), ] diff --git a/shared/src/ledger/native_vp/mod.rs b/shared/src/ledger/native_vp/mod.rs index 31148a1568..1635b4559b 100644 --- a/shared/src/ledger/native_vp/mod.rs +++ b/shared/src/ledger/native_vp/mod.rs @@ -5,7 +5,6 @@ pub mod ethereum_bridge; pub mod ibc; pub mod multitoken; pub mod parameters; -pub mod replay_protection; use std::cell::RefCell; use std::collections::BTreeSet; diff --git a/shared/src/ledger/native_vp/replay_protection.rs b/shared/src/ledger/native_vp/replay_protection.rs deleted file mode 100644 index a2a2a66f36..0000000000 --- a/shared/src/ledger/native_vp/replay_protection.rs +++ /dev/null @@ -1,53 +0,0 @@ -//! Native VP for replay protection - -use std::collections::BTreeSet; - -use namada_core::ledger::storage; -use namada_core::types::address::Address; -use namada_core::types::storage::Key; -use thiserror::Error; - -use crate::ledger::native_vp::{self, Ctx, NativeVp}; -use crate::proto::Tx; -use crate::vm::WasmCacheAccess; - -#[allow(missing_docs)] -#[derive(Error, Debug)] -pub enum Error { - #[error("Native VP error: {0}")] - NativeVpError(#[from] native_vp::Error), -} - -/// ReplayProtection functions result -pub type Result<T> = std::result::Result<T, Error>; - -/// Replay Protection VP -pub struct ReplayProtectionVp<'a, DB, H, CA> -where - DB: storage::DB + for<'iter> storage::DBIter<'iter>, - H: storage::StorageHasher, - CA: WasmCacheAccess, -{ - /// Context to interact with the host structures. - pub ctx: Ctx<'a, DB, H, CA>, -} - -impl<'a, DB, H, CA> NativeVp for ReplayProtectionVp<'a, DB, H, CA> -where - DB: 'static + storage::DB + for<'iter> storage::DBIter<'iter>, - H: 'static + storage::StorageHasher, - CA: 'static + WasmCacheAccess, -{ - type Error = Error; - - fn validate_tx( - &self, - _tx_data: &Tx, - _keys_changed: &BTreeSet<Key>, - _verifiers: &BTreeSet<Address>, - ) -> Result<bool> { - // VP should prevent any modification of the subspace. - // Changes are only allowed from protocol - Ok(false) - } -} diff --git a/shared/src/ledger/protocol/mod.rs b/shared/src/ledger/protocol/mod.rs index a23b026eea..c1304bb01d 100644 --- a/shared/src/ledger/protocol/mod.rs +++ b/shared/src/ledger/protocol/mod.rs @@ -7,7 +7,7 @@ use eyre::{eyre, WrapErr}; use masp_primitives::transaction::Transaction; use namada_core::ledger::gas::TxGasMeter; use namada_core::ledger::storage::wl_storage::WriteLogAndStorage; -use namada_core::ledger::storage_api::{StorageRead, StorageWrite}; +use namada_core::ledger::storage_api::StorageRead; use namada_core::proto::Section; use namada_core::types::hash::Hash; use namada_core::types::storage::Key; @@ -24,7 +24,6 @@ use crate::ledger::native_vp::ethereum_bridge::vp::EthBridge; use crate::ledger::native_vp::ibc::Ibc; use crate::ledger::native_vp::multitoken::MultitokenVp; use crate::ledger::native_vp::parameters::{self, ParametersVp}; -use crate::ledger::native_vp::replay_protection::ReplayProtectionVp; use crate::ledger::native_vp::{self, NativeVp}; use crate::ledger::pgf::PgfVp; use crate::ledger::pos::{self, PosVP}; @@ -33,10 +32,10 @@ use crate::ledger::storage::{DBIter, Storage, StorageHasher, WlStorage, DB}; use crate::ledger::{replay_protection, storage_api}; use crate::proto::{self, Tx}; use crate::types::address::{Address, InternalAddress}; +use crate::types::storage; use crate::types::storage::TxIndex; use crate::types::transaction::protocol::{EthereumTxData, ProtocolTxType}; use crate::types::transaction::{DecryptedTx, TxResult, TxType, VpsResult}; -use crate::types::{hash, storage}; use crate::vm::wasm::{TxCache, VpCache}; use crate::vm::{self, wasm, WasmCacheAccess}; @@ -83,10 +82,6 @@ pub enum Error { EthBridgeNativeVpError(native_vp::ethereum_bridge::vp::Error), #[error("Ethereum bridge pool native VP error: {0}")] BridgePoolNativeVpError(native_vp::ethereum_bridge::bridge_pool_vp::Error), - #[error("Replay protection native VP error: {0}")] - ReplayProtectionNativeVpError( - crate::ledger::native_vp::replay_protection::Error, - ), #[error("Non usable tokens native VP error: {0}")] NutNativeVpError(native_vp::ethereum_bridge::nut::Error), #[error("Access to an internal address {0} is forbidden")] @@ -169,9 +164,12 @@ where apply_protocol_tx(protocol_tx.tx, tx.data(), wl_storage) } TxType::Wrapper(ref wrapper) => { + let fee_unshielding_transaction = + get_fee_unshielding_transaction(&tx, wrapper); let changed_keys = apply_wrapper_tx( + tx, wrapper, - get_fee_unshielding_transaction(&tx, wrapper), + fee_unshielding_transaction, tx_bytes, ShellParams { tx_gas_meter, @@ -212,12 +210,14 @@ where } /// Performs the required operation on a wrapper transaction: -/// - replay protection /// - fee payment /// - gas accounting +/// - replay protection /// -/// Returns the set of changed storage keys. +/// Returns the set of changed storage keys. The caller should write the hash of +/// the wrapper header to storage in case of failure. pub(crate) fn apply_wrapper_tx<'a, D, H, CA, WLS>( + mut tx: Tx, wrapper: &WrapperTx, fee_unshield_transaction: Option<Transaction>, tx_bytes: &[u8], @@ -231,18 +231,6 @@ where WLS: WriteLogAndStorage<D = D, H = H>, { let mut changed_keys = BTreeSet::default(); - let mut tx: Tx = tx_bytes.try_into().unwrap(); - - // Writes wrapper tx hash to block write log (changes must be persisted even - // in case of failure) - let wrapper_hash_key = replay_protection::get_replay_protection_key( - &hash::Hash(tx.header_hash().0), - ); - shell_params - .wl_storage - .write(&wrapper_hash_key, ()) - .expect("Error while writing tx hash to storage"); - changed_keys.insert(wrapper_hash_key); // Charge fee before performing any fallible operations charge_fee( @@ -257,14 +245,13 @@ where shell_params.tx_gas_meter.add_tx_size_gas(tx_bytes)?; // If wrapper was succesful, write inner tx hash to storage - let inner_hash_key = replay_protection::get_replay_protection_key( - &hash::Hash(tx.update_header(TxType::Raw).header_hash().0), - ); shell_params .wl_storage - .write(&inner_hash_key, ()) + .write_tx_hash(tx.update_header(TxType::Raw).header_hash()) .expect("Error while writing tx hash to storage"); - changed_keys.insert(inner_hash_key); + changed_keys.insert(replay_protection::get_replay_protection_last_key( + &tx.header_hash(), + )); Ok(changed_keys) } @@ -934,16 +921,6 @@ where gas_meter = bridge_pool.ctx.gas_meter.into_inner(); result } - InternalAddress::ReplayProtection => { - let replay_protection_vp = - ReplayProtectionVp { ctx }; - let result = replay_protection_vp - .validate_tx(tx, &keys_changed, &verifiers) - .map_err(Error::ReplayProtectionNativeVpError); - gas_meter = - replay_protection_vp.ctx.gas_meter.into_inner(); - result - } InternalAddress::Pgf => { let pgf_vp = PgfVp { ctx }; let result = pgf_vp diff --git a/shared/src/ledger/queries/shell.rs b/shared/src/ledger/queries/shell.rs index a766846916..0e45996ebf 100644 --- a/shared/src/ledger/queries/shell.rs +++ b/shared/src/ledger/queries/shell.rs @@ -122,6 +122,7 @@ where let mut tx_gas_meter = TxGasMeter::new(wrapper.gas_limit.to_owned()); protocol::apply_wrapper_tx( + tx.clone(), &wrapper, None, &request.data,