From 31e130a821cdba0daaa75da051c8c19237efbff0 Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Mon, 5 Sep 2022 09:40:12 +0200 Subject: [PATCH] fix: stop race condition in output encumbrance (#4613) Description --- Add a mutex to stop a race condition in the output manager error. Its possible that if more than on transactions happens in near the same time, the transactions will select the same inputs. How Has This Been Tested? --- Unit tests --- base_layer/core/src/mempool/mempool_storage.rs | 8 ++++++-- base_layer/core/src/validation/error.rs | 2 ++ .../src/validation/transaction_validators.rs | 2 +- base_layer/core/tests/mempool.rs | 1 + .../storage/sqlite_db/mod.rs | 17 ++++++++++++++++- 5 files changed, 26 insertions(+), 4 deletions(-) diff --git a/base_layer/core/src/mempool/mempool_storage.rs b/base_layer/core/src/mempool/mempool_storage.rs index 9fbe293060..9d1b5751ba 100644 --- a/base_layer/core/src/mempool/mempool_storage.rs +++ b/base_layer/core/src/mempool/mempool_storage.rs @@ -96,12 +96,12 @@ impl MempoolStorage { self.unconfirmed_pool.insert(tx, Some(dependent_outputs), &weight)?; Ok(TxStorageResponse::UnconfirmedPool) } else { - debug!(target: LOG_TARGET, "Validation failed due to unknown inputs"); + warn!(target: LOG_TARGET, "Validation failed due to unknown inputs"); Ok(TxStorageResponse::NotStoredOrphan) } }, Err(ValidationError::ContainsSTxO) => { - debug!(target: LOG_TARGET, "Validation failed due to already spent output"); + warn!(target: LOG_TARGET, "Validation failed due to already spent input"); Ok(TxStorageResponse::NotStoredAlreadySpent) }, Err(ValidationError::MaturityError) => { @@ -112,6 +112,10 @@ impl MempoolStorage { warn!(target: LOG_TARGET, "Validation failed due to consensus rule: {}", msg); Ok(TxStorageResponse::NotStoredConsensus) }, + Err(ValidationError::DuplicateKernelError(msg)) => { + warn!(target: LOG_TARGET, "Validation failed due to duplicate kernel: {}", msg); + Ok(TxStorageResponse::NotStoredConsensus) + }, Err(e) => { warn!(target: LOG_TARGET, "Validation failed due to error: {}", e); Ok(TxStorageResponse::NotStored) diff --git a/base_layer/core/src/validation/error.rs b/base_layer/core/src/validation/error.rs index ae34950a65..21604824d0 100644 --- a/base_layer/core/src/validation/error.rs +++ b/base_layer/core/src/validation/error.rs @@ -113,6 +113,8 @@ pub enum ValidationError { }, #[error("Consensus Error: {0}")] ConsensusError(String), + #[error("Duplicate kernel Error: {0}")] + DuplicateKernelError(String), #[error("Covenant failed to validate: {0}")] CovenantError(#[from] CovenantError), #[error("Invalid or unsupported blockchain version {version}")] diff --git a/base_layer/core/src/validation/transaction_validators.rs b/base_layer/core/src/validation/transaction_validators.rs index ae2e819ded..9d5a313789 100644 --- a/base_layer/core/src/validation/transaction_validators.rs +++ b/base_layer/core/src/validation/transaction_validators.rs @@ -172,7 +172,7 @@ impl TxConsensusValidator { db_kernel.excess_sig.get_signature().to_hex(), ); warn!(target: LOG_TARGET, "{}", msg); - return Err(ValidationError::ConsensusError(msg)); + return Err(ValidationError::DuplicateKernelError(msg)); }; } Ok(()) diff --git a/base_layer/core/tests/mempool.rs b/base_layer/core/tests/mempool.rs index 87911a2e7b..74f4dfe3ac 100644 --- a/base_layer/core/tests/mempool.rs +++ b/base_layer/core/tests/mempool.rs @@ -1188,6 +1188,7 @@ async fn consensus_validation_unique_excess_sig() { // trying to submit a transaction with an existing excess signature already in the chain is an error let tx = Arc::new(tx1); let response = mempool.insert(tx).await.unwrap(); + dbg!(&response); assert!(matches!(response, TxStorageResponse::NotStoredConsensus)); } diff --git a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs index 7c2ad39bab..f00b6a4cd8 100644 --- a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs +++ b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs @@ -22,7 +22,7 @@ use std::{ convert::{TryFrom, TryInto}, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, RwLock}, }; use chacha20poly1305::XChaCha20Poly1305; @@ -68,6 +68,7 @@ const LOG_TARGET: &str = "wallet::output_manager_service::database::wallet"; pub struct OutputManagerSqliteDatabase { database_connection: WalletDbConnection, cipher: Arc>>, + encumber_lock: Arc>, } impl OutputManagerSqliteDatabase { @@ -75,6 +76,7 @@ impl OutputManagerSqliteDatabase { Self { database_connection, cipher: Arc::new(RwLock::new(cipher)), + encumber_lock: Arc::new(Mutex::new(())), } } @@ -661,8 +663,15 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { let start = Instant::now(); let conn = self.database_connection.get_pooled_connection()?; let acquire_lock = start.elapsed(); + // We need to ensure that this whole encumber operation happens inside of a mutex to ensure thread safety as the + // transaction first check checks if it can encumber then encumbers them. + let _guard = self + .encumber_lock + .lock() + .map_err(|e| OutputManagerStorageError::UnexpectedResult(format!("Encumber lock poisoned: {}", e)))?; let mut outputs_to_be_spent = Vec::with_capacity(outputs_to_send.len()); + for i in outputs_to_send { let output = OutputSql::find_by_commitment_and_cancelled(i.commitment.as_bytes(), false, &conn)?; if output.status != (OutputStatus::Unspent as i32) { @@ -714,6 +723,12 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { let conn = self.database_connection.get_pooled_connection()?; let acquire_lock = start.elapsed(); + // We need to ensure that this whole encumber operation happens inside of a mutex to ensure thread safety as the + // transaction first check checks if it can encumber then encumbers them. + let _guard = self + .encumber_lock + .lock() + .map_err(|e| OutputManagerStorageError::UnexpectedResult(format!("Encumber lock poisoned: {}", e)))?; let outputs_to_be_received = OutputSql::find_by_tx_id_and_status(tx_id, OutputStatus::ShortTermEncumberedToBeReceived, &conn)?; for o in &outputs_to_be_received {