diff --git a/base_layer/wallet/src/test_utils.rs b/base_layer/wallet/src/test_utils.rs index 5cf0758919..32da53b26c 100644 --- a/base_layer/wallet/src/test_utils.rs +++ b/base_layer/wallet/src/test_utils.rs @@ -20,12 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use crate::{ - contacts_service::storage::sqlite_db::ContactsServiceSqliteDatabase, - output_manager_service::storage::sqlite_db::OutputManagerSqliteDatabase, - storage::{sqlite_db::WalletSqliteDatabase, sqlite_utilities::run_migration_and_create_sqlite_connection}, - transaction_service::storage::sqlite_db::TransactionServiceSqliteDatabase, -}; +use crate::storage::sqlite_utilities::{run_migration_and_create_sqlite_connection, WalletDbConnection}; use core::iter; use rand::{distributions::Alphanumeric, rngs::OsRng, Rng}; use std::path::Path; @@ -39,15 +34,7 @@ pub fn random_string(len: usize) -> String { } /// A test helper to create a temporary wallet service databases -pub fn make_wallet_databases( - path: Option, -) -> ( - WalletSqliteDatabase, - TransactionServiceSqliteDatabase, - OutputManagerSqliteDatabase, - ContactsServiceSqliteDatabase, - Option, -) { +pub fn make_wallet_database_connection(path: Option) -> (WalletDbConnection, Option) { let (path_string, temp_dir): (String, Option) = if let Some(p) = path { (p, None) } else { @@ -61,11 +48,5 @@ pub fn make_wallet_databases( let connection = run_migration_and_create_sqlite_connection(&db_path.to_str().expect("Should be able to make path")).unwrap(); - ( - WalletSqliteDatabase::new(connection.clone(), None).expect("Should be able to create wallet database"), - TransactionServiceSqliteDatabase::new(connection.clone(), None), - OutputManagerSqliteDatabase::new(connection.clone(), None), - ContactsServiceSqliteDatabase::new(connection), - temp_dir, - ) + (connection, temp_dir) } diff --git a/base_layer/wallet/src/transaction_service/error.rs b/base_layer/wallet/src/transaction_service/error.rs index 05e9e4af2b..6a2a8af144 100644 --- a/base_layer/wallet/src/transaction_service/error.rs +++ b/base_layer/wallet/src/transaction_service/error.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::{ + error::WalletStorageError, output_manager_service::{error::OutputManagerError, TxId}, transaction_service::storage::database::DbKey, }; @@ -100,6 +101,8 @@ pub enum TransactionServiceError { TransportChannelError(#[from] TransportChannelError), #[error("Transaction storage error: `{0}`")] TransactionStorageError(#[from] TransactionStorageError), + #[error("Wallet storage error: `{0}`")] + WalletStorageError(#[from] WalletStorageError), #[error("Invalid message error: `{0}`")] InvalidMessageError(String), #[error("Transaction error: `{0}`")] @@ -140,6 +143,8 @@ pub enum TransactionServiceError { ByteArrayError(#[from] tari_crypto::tari_utilities::ByteArrayError), #[error("Transaction Service Error: `{0}`")] ServiceError(String), + #[error("Wallet Recovery in progress so Transaction Service Messaging Requests ignored")] + WalletRecoveryInProgress, } #[derive(Debug, Error)] diff --git a/base_layer/wallet/src/transaction_service/mod.rs b/base_layer/wallet/src/transaction_service/mod.rs index 541d898770..48b3484dca 100644 --- a/base_layer/wallet/src/transaction_service/mod.rs +++ b/base_layer/wallet/src/transaction_service/mod.rs @@ -22,6 +22,16 @@ use std::sync::Arc; +use crate::{ + output_manager_service::handle::OutputManagerHandle, + storage::database::{WalletBackend, WalletDatabase}, + transaction_service::{ + config::TransactionServiceConfig, + handle::TransactionServiceHandle, + service::TransactionService, + storage::database::{TransactionBackend, TransactionDatabase}, + }, +}; use futures::{Stream, StreamExt}; use log::*; use tokio::sync::broadcast; @@ -46,16 +56,6 @@ use tari_service_framework::{ ServiceInitializerContext, }; -use crate::{ - output_manager_service::handle::OutputManagerHandle, - transaction_service::{ - config::TransactionServiceConfig, - handle::TransactionServiceHandle, - service::TransactionService, - storage::database::{TransactionBackend, TransactionDatabase}, - }, -}; - pub mod config; pub mod error; pub mod handle; @@ -67,18 +67,23 @@ pub mod tasks; const LOG_TARGET: &str = "wallet::transaction_service"; const SUBSCRIPTION_LABEL: &str = "Transaction Service"; -pub struct TransactionServiceInitializer -where T: TransactionBackend +pub struct TransactionServiceInitializer +where + T: TransactionBackend, + W: WalletBackend, { config: TransactionServiceConfig, subscription_factory: Arc, - backend: Option, + tx_backend: Option, node_identity: Arc, factories: CryptoFactories, + wallet_database: Option>, } -impl TransactionServiceInitializer -where T: TransactionBackend +impl TransactionServiceInitializer +where + T: TransactionBackend, + W: WalletBackend, { pub fn new( config: TransactionServiceConfig, @@ -86,13 +91,15 @@ where T: TransactionBackend backend: T, node_identity: Arc, factories: CryptoFactories, + wallet_database: WalletDatabase, ) -> Self { Self { config, subscription_factory, - backend: Some(backend), + tx_backend: Some(backend), node_identity, factories, + wallet_database: Some(wallet_database), } } @@ -164,8 +171,10 @@ where T: TransactionBackend } #[async_trait] -impl ServiceInitializer for TransactionServiceInitializer -where T: TransactionBackend + 'static +impl ServiceInitializer for TransactionServiceInitializer +where + T: TransactionBackend + 'static, + W: WalletBackend + 'static, { async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> { let (sender, receiver) = reply_channel::unbounded(); @@ -182,11 +191,16 @@ where T: TransactionBackend + 'static // Register handle before waiting for handles to be ready context.register_handle(transaction_handle); - let backend = self - .backend + let tx_backend = self + .tx_backend .take() .expect("Cannot start Transaction Service without providing a backend"); + let wallet_database = self + .wallet_database + .take() + .expect("Cannot start Transaction Service without providing a wallet database"); + let node_identity = self.node_identity.clone(); let factories = self.factories.clone(); let config = self.config.clone(); @@ -198,7 +212,8 @@ where T: TransactionBackend + 'static let result = TransactionService::new( config, - TransactionDatabase::new(backend), + TransactionDatabase::new(tx_backend), + wallet_database, receiver, transaction_stream, transaction_reply_stream, diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index 12aa052382..c5fbb892d2 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -20,21 +20,44 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{ - collections::{HashMap, HashSet}, - convert::TryInto, - sync::Arc, - time::{Duration, Instant}, +use crate::{ + output_manager_service::{handle::OutputManagerHandle, TxId}, + storage::database::{WalletBackend, WalletDatabase}, + transaction_service::{ + config::TransactionServiceConfig, + error::{TransactionServiceError, TransactionServiceProtocolError}, + handle::{TransactionEvent, TransactionEventSender, TransactionServiceRequest, TransactionServiceResponse}, + protocols::{ + transaction_broadcast_protocol::TransactionBroadcastProtocol, + transaction_coinbase_monitoring_protocol::TransactionCoinbaseMonitoringProtocol, + transaction_receive_protocol::{TransactionReceiveProtocol, TransactionReceiveProtocolStage}, + transaction_send_protocol::{TransactionSendProtocol, TransactionSendProtocolStage}, + transaction_validation_protocol::TransactionValidationProtocol, + }, + storage::{ + database::{TransactionBackend, TransactionDatabase}, + models::{CompletedTransaction, TransactionDirection, TransactionStatus}, + }, + tasks::{ + send_finalized_transaction::send_finalized_transaction_message, + send_transaction_cancelled::send_transaction_cancelled_message, + send_transaction_reply::send_transaction_reply, + }, + }, + types::{HashDigest, ValidationRetryStrategy}, + utxo_scanner_service::utxo_scanning::RECOVERY_KEY, }; - use chrono::{NaiveDateTime, Utc}; use digest::Digest; use futures::{pin_mut, stream::FuturesUnordered, Stream, StreamExt}; use log::*; use rand::{rngs::OsRng, RngCore}; -use tari_crypto::{keys::DiffieHellmanSharedSecret, script, tari_utilities::ByteArray}; -use tokio::{sync::broadcast, task::JoinHandle}; - +use std::{ + collections::{HashMap, HashSet}, + convert::TryInto, + sync::Arc, + time::{Duration, Instant}, +}; use tari_common_types::types::PrivateKey; use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeIdentity, types::CommsPublicKey}; use tari_comms_dht::outbound::OutboundMessageRequester; @@ -54,35 +77,13 @@ use tari_core::{ ReceiverTransactionProtocol, }, }; +use tari_crypto::{keys::DiffieHellmanSharedSecret, script, tari_utilities::ByteArray}; use tari_p2p::domain_message::DomainMessage; use tari_service_framework::{reply_channel, reply_channel::Receiver}; use tari_shutdown::ShutdownSignal; -use tokio::sync::{mpsc, mpsc::Sender, oneshot}; - -use crate::{ - output_manager_service::{handle::OutputManagerHandle, TxId}, - transaction_service::{ - config::TransactionServiceConfig, - error::{TransactionServiceError, TransactionServiceProtocolError}, - handle::{TransactionEvent, TransactionEventSender, TransactionServiceRequest, TransactionServiceResponse}, - protocols::{ - transaction_broadcast_protocol::TransactionBroadcastProtocol, - transaction_coinbase_monitoring_protocol::TransactionCoinbaseMonitoringProtocol, - transaction_receive_protocol::{TransactionReceiveProtocol, TransactionReceiveProtocolStage}, - transaction_send_protocol::{TransactionSendProtocol, TransactionSendProtocolStage}, - transaction_validation_protocol::TransactionValidationProtocol, - }, - storage::{ - database::{TransactionBackend, TransactionDatabase}, - models::{CompletedTransaction, TransactionDirection, TransactionStatus}, - }, - tasks::{ - send_finalized_transaction::send_finalized_transaction_message, - send_transaction_cancelled::send_transaction_cancelled_message, - send_transaction_reply::send_transaction_reply, - }, - }, - types::{HashDigest, ValidationRetryStrategy}, +use tokio::{ + sync::{broadcast, mpsc, mpsc::Sender, oneshot}, + task::JoinHandle, }; const LOG_TARGET: &str = "wallet::transaction_service::service"; @@ -107,7 +108,10 @@ pub struct TransactionService< BNResponseStream, TBackend, TTxCancelledStream, -> where TBackend: TransactionBackend + 'static + WBackend, +> where + TBackend: TransactionBackend + 'static, + WBackend: WalletBackend + 'static, { config: TransactionServiceConfig, db: TransactionDatabase, @@ -134,11 +138,20 @@ pub struct TransactionService< timeout_update_publisher: broadcast::Sender, base_node_update_publisher: broadcast::Sender, power_mode: PowerMode, + wallet_db: WalletDatabase, } #[allow(clippy::too_many_arguments)] -impl - TransactionService +impl + TransactionService< + TTxStream, + TTxReplyStream, + TTxFinalizedStream, + BNResponseStream, + TBackend, + TTxCancelledStream, + WBackend, + > where TTxStream: Stream>, TTxReplyStream: Stream>, @@ -146,10 +159,12 @@ where BNResponseStream: Stream>, TTxCancelledStream: Stream>, TBackend: TransactionBackend + 'static, + WBackend: WalletBackend + 'static, { pub fn new( config: TransactionServiceConfig, db: TransactionDatabase, + wallet_db: WalletDatabase, request_stream: Receiver< TransactionServiceRequest, Result, @@ -208,6 +223,7 @@ where timeout_update_publisher, base_node_update_publisher, power_mode: PowerMode::Normal, + wallet_db, } } @@ -316,7 +332,7 @@ where msg.dht_header.message_tag); } Err(e) => { - warn!(target: LOG_TARGET, "Failed to handle incoming Transaction message: {:?} for NodeID: {}, Trace: {}", + warn!(target: LOG_TARGET, "Failed to handle incoming Transaction message: {} for NodeID: {}, Trace: {}", e, self.node_identity.node_id().short_str(), msg.dht_header.message_tag); let _ = self.event_publisher.send(Arc::new(TransactionEvent::Error(format!("Error handling \ Transaction Sender message: {:?}", e).to_string()))); @@ -346,7 +362,7 @@ where msg.dht_header.message_tag); }, Err(e) => { - warn!(target: LOG_TARGET, "Failed to handle incoming Transaction Reply message: {:?} \ + warn!(target: LOG_TARGET, "Failed to handle incoming Transaction Reply message: {} \ for NodeId: {}, Trace: {}", e, self.node_identity.node_id().short_str(), msg.dht_header.message_tag); let _ = self.event_publisher.send(Arc::new(TransactionEvent::Error("Error handling \ @@ -384,7 +400,7 @@ where msg.dht_header.message_tag); }, Err(e) => { - warn!(target: LOG_TARGET, "Failed to handle incoming Transaction Finalized message: {:?} \ + warn!(target: LOG_TARGET, "Failed to handle incoming Transaction Finalized message: {} \ for NodeID: {}, Trace: {}", e , self.node_identity.node_id().short_str(), msg.dht_header.message_tag.as_value()); let _ = self.event_publisher.send(Arc::new(TransactionEvent::Error("Error handling Transaction \ @@ -879,6 +895,9 @@ where source_pubkey: CommsPublicKey, recipient_reply: proto::RecipientSignedMessage, ) -> Result<(), TransactionServiceError> { + // Check if a wallet recovery is in progress, if it is we will ignore this request + self.check_recovery_status().await?; + let recipient_reply: RecipientSignedMessage = recipient_reply .try_into() .map_err(TransactionServiceError::InvalidMessageError)?; @@ -1181,6 +1200,9 @@ where traced_message_tag: u64, join_handles: &mut FuturesUnordered>>, ) -> Result<(), TransactionServiceError> { + // Check if a wallet recovery is in progress, if it is we will ignore this request + self.check_recovery_status().await?; + let sender_message: TransactionSenderMessage = sender_message .try_into() .map_err(TransactionServiceError::InvalidMessageError)?; @@ -1289,6 +1311,9 @@ where finalized_transaction: proto::TransactionFinalizedMessage, join_handles: &mut FuturesUnordered>>, ) -> Result<(), TransactionServiceError> { + // Check if a wallet recovery is in progress, if it is we will ignore this request + self.check_recovery_status().await?; + let tx_id = finalized_transaction.tx_id; let transaction: Transaction = finalized_transaction .transaction @@ -2025,6 +2050,16 @@ where Ok(()) } + + /// Check if a Recovery Status is currently stored in the databse, this indicates that a wallet recovery is in + /// progress + async fn check_recovery_status(&self) -> Result<(), TransactionServiceError> { + let value = self.wallet_db.get_client_key_value(RECOVERY_KEY.to_owned()).await?; + match value { + None => Ok(()), + Some(_) => Err(TransactionServiceError::WalletRecoveryInProgress), + } + } } /// This struct is a collection of the common resources that a protocol in the service requires. diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index f29b78cdfc..72733c459b 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -187,6 +187,7 @@ where transaction_backend, node_identity.clone(), factories.clone(), + wallet_database.clone(), )) .add_initializer(ContactsServiceInitializer::new(contacts_backend)) .add_initializer(BaseNodeServiceInitializer::new( diff --git a/base_layer/wallet/tests/transaction_service/service.rs b/base_layer/wallet/tests/transaction_service/service.rs index 6e1265460d..a7db549994 100644 --- a/base_layer/wallet/tests/transaction_service/service.rs +++ b/base_layer/wallet/tests/transaction_service/service.rs @@ -115,17 +115,18 @@ use tari_wallet::{ handle::OutputManagerHandle, service::OutputManagerService, storage::{ - database::{OutputManagerBackend, OutputManagerDatabase}, + database::OutputManagerDatabase, models::KnownOneSidedPaymentScript, sqlite_db::OutputManagerSqliteDatabase, }, OutputManagerServiceInitializer, }, storage::{ - database::{WalletBackend, WalletDatabase}, - sqlite_utilities::run_migration_and_create_sqlite_connection, + database::WalletDatabase, + sqlite_db::WalletSqliteDatabase, + sqlite_utilities::{run_migration_and_create_sqlite_connection, WalletDbConnection}, }, - test_utils::make_wallet_databases, + test_utils::make_wallet_database_connection, transaction_service::{ config::TransactionServiceConfig, error::TransactionServiceError, @@ -158,19 +159,12 @@ fn create_runtime() -> Runtime { } #[allow(clippy::too_many_arguments)] -pub fn setup_transaction_service< - W: WalletBackend + 'static, - T: TransactionBackend + 'static, - K: OutputManagerBackend + 'static, - P: AsRef, ->( +pub fn setup_transaction_service>( runtime: &mut Runtime, node_identity: Arc, peers: Vec>, factories: CryptoFactories, - wallet_backend: W, - tx_backend: T, - oms_backend: K, + db_connection: WalletDbConnection, database_path: P, discovery_request_timeout: Duration, shutdown_signal: ShutdownSignal, @@ -187,11 +181,14 @@ pub fn setup_transaction_service< shutdown_signal.clone(), )); - let db = WalletDatabase::new(wallet_backend); + let db = WalletDatabase::new(WalletSqliteDatabase::new(db_connection.clone(), None).unwrap()); let metadata = ChainMetadata::new(std::u64::MAX, Vec::new(), 0, 0, 0); runtime.block_on(db.set_chain_metadata(metadata)).unwrap(); + let ts_backend = TransactionServiceSqliteDatabase::new(db_connection.clone(), None); + let oms_backend = OutputManagerSqliteDatabase::new(db_connection, None); + let fut = StackBuilder::new(shutdown_signal) .add_initializer(RegisterHandle::new(dht)) .add_initializer(RegisterHandle::new(comms.connectivity())) @@ -211,9 +208,10 @@ pub fn setup_transaction_service< ..Default::default() }, subscription_factory, - tx_backend, + ts_backend, comms.node_identity(), factories, + db.clone(), )) .add_initializer(BaseNodeServiceInitializer::new(BaseNodeServiceConfig::default(), db)) .add_initializer(WalletConnectivityInitializer::new(BaseNodeServiceConfig::default())) @@ -230,11 +228,10 @@ pub fn setup_transaction_service< /// This utility function creates a Transaction service without using the Service Framework Stack and exposes all the /// streams for testing purposes. #[allow(clippy::type_complexity)] -pub fn setup_transaction_service_no_comms( +pub fn setup_transaction_service_no_comms( runtime: &mut Runtime, factories: CryptoFactories, - tx_backend: T, - oms_backend: K, + db_connection: WalletDbConnection, config: Option, ) -> ( TransactionServiceHandle, @@ -251,18 +248,14 @@ pub fn setup_transaction_service_no_comms, BaseNodeWalletRpcMockState, ) { - setup_transaction_service_no_comms_and_oms_backend(runtime, factories, tx_backend, oms_backend, config) + setup_transaction_service_no_comms_and_oms_backend(runtime, factories, db_connection, config) } #[allow(clippy::type_complexity)] -pub fn setup_transaction_service_no_comms_and_oms_backend< - T: TransactionBackend + 'static, - S: OutputManagerBackend + 'static, ->( +pub fn setup_transaction_service_no_comms_and_oms_backend( runtime: &mut Runtime, factories: CryptoFactories, - tx_backend: T, - oms_backend: S, + db_connection: WalletDbConnection, config: Option, ) -> ( TransactionServiceHandle, @@ -336,12 +329,18 @@ pub fn setup_transaction_service_no_comms_and_oms_backend< mock_base_node_service.set_default_base_node_state(); runtime.spawn(mock_base_node_service.run()); + let wallet_db = WalletDatabase::new( + WalletSqliteDatabase::new(db_connection.clone(), None).expect("Should be able to create wallet database"), + ); + let ts_db = TransactionDatabase::new(TransactionServiceSqliteDatabase::new(db_connection.clone(), None)); + let oms_db = OutputManagerDatabase::new(OutputManagerSqliteDatabase::new(db_connection, None)); + let output_manager_service = runtime .block_on(OutputManagerService::new( OutputManagerServiceConfig::default(), ts_handle.clone(), oms_request_receiver, - OutputManagerDatabase::new(oms_backend), + oms_db, oms_event_publisher.clone(), factories.clone(), constants, @@ -369,7 +368,8 @@ pub fn setup_transaction_service_no_comms_and_oms_backend< let ts_service = TransactionService::new( test_config, - TransactionDatabase::new(tx_backend), + ts_db, + wallet_db, ts_request_receiver, tx_receiver, tx_ack_receiver, @@ -490,10 +490,8 @@ fn manage_single_transaction() { ); let temp_dir = tempdir().unwrap(); let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (alice_wallet_backend, alice_backend, alice_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); - let (bob_wallet_backend, bob_backend, bob_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); + let (alice_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let (bob_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); let shutdown = Shutdown::new(); let (mut alice_ts, mut alice_oms, _alice_comms) = setup_transaction_service( @@ -501,9 +499,7 @@ fn manage_single_transaction() { alice_node_identity.clone(), vec![], factories.clone(), - alice_wallet_backend, - alice_backend, - alice_oms_backend, + alice_connection, database_path.clone(), Duration::from_secs(0), shutdown.to_signal(), @@ -521,9 +517,7 @@ fn manage_single_transaction() { bob_node_identity.clone(), vec![alice_node_identity.clone()], factories.clone(), - bob_wallet_backend, - bob_backend, - bob_oms_backend, + bob_connection, database_path, Duration::from_secs(0), shutdown.to_signal(), @@ -648,8 +642,7 @@ fn single_transaction_to_self() { let temp_dir = tempdir().unwrap(); let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (alice_wallet_backend, alice_backend, alice_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); + let (db_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); let shutdown = Shutdown::new(); let (mut alice_ts, mut alice_oms, _alice_comms) = setup_transaction_service( @@ -657,9 +650,7 @@ fn single_transaction_to_self() { alice_node_identity.clone(), vec![], factories.clone(), - alice_wallet_backend, - alice_backend, - alice_oms_backend, + db_connection, database_path, Duration::from_secs(0), shutdown.to_signal(), @@ -740,8 +731,7 @@ fn send_one_sided_transaction_to_other() { let temp_dir = tempdir().unwrap(); let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (alice_wallet_backend, alice_backend, alice_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); + let (db_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); let shutdown = Shutdown::new(); let (mut alice_ts, mut alice_oms, _alice_comms) = setup_transaction_service( @@ -749,9 +739,7 @@ fn send_one_sided_transaction_to_other() { alice_node_identity, vec![], factories.clone(), - alice_wallet_backend, - alice_backend, - alice_oms_backend, + db_connection, database_path, Duration::from_secs(0), shutdown.to_signal(), @@ -861,10 +849,8 @@ fn recover_one_sided_transaction() { let database_path = temp_dir.path().to_str().unwrap().to_string(); let database_path2 = temp_dir2.path().to_str().unwrap().to_string(); - let (alice_wallet_backend, alice_backend, alice_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); - let (bob_wallet_backend, bob_backend, bob_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path2.clone())); + let (alice_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let (bob_connection, _tempdir) = make_wallet_database_connection(Some(database_path2.clone())); let shutdown = Shutdown::new(); let (mut alice_ts, alice_oms, _alice_comms) = setup_transaction_service( @@ -872,9 +858,7 @@ fn recover_one_sided_transaction() { alice_node_identity, vec![], factories.clone(), - alice_wallet_backend, - alice_backend, - alice_oms_backend, + alice_connection, database_path, Duration::from_secs(0), shutdown.to_signal(), @@ -885,9 +869,7 @@ fn recover_one_sided_transaction() { bob_node_identity.clone(), vec![], factories.clone(), - bob_wallet_backend, - bob_backend, - bob_oms_backend, + bob_connection, database_path2, Duration::from_secs(0), shutdown.to_signal(), @@ -976,8 +958,7 @@ fn send_one_sided_transaction_to_self() { let temp_dir = tempdir().unwrap(); let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (alice_wallet_backend, alice_backend, alice_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); + let (alice_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); let shutdown = Shutdown::new(); let (mut alice_ts, alice_oms, _alice_comms) = setup_transaction_service( @@ -985,9 +966,7 @@ fn send_one_sided_transaction_to_self() { alice_node_identity.clone(), vec![], factories.clone(), - alice_wallet_backend, - alice_backend, - alice_oms_backend, + alice_connection, database_path, Duration::from_secs(0), shutdown.to_signal(), @@ -1061,12 +1040,9 @@ fn manage_multiple_transactions() { let database_path = temp_dir.path().to_str().unwrap().to_string(); - let (alice_wallet_backend, alice_backend, alice_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); - let (bob_wallet_backend, bob_backend, bob_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); - let (carol_wallet_backend, carol_backend, carol_oms_backend, _, _tempdir) = - make_wallet_databases(Some(database_path.clone())); + let (alice_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let (bob_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); + let (carol_connection, _tempdir) = make_wallet_database_connection(Some(database_path.clone())); let mut shutdown = Shutdown::new(); @@ -1075,9 +1051,7 @@ fn manage_multiple_transactions() { alice_node_identity.clone(), vec![bob_node_identity.clone(), carol_node_identity.clone()], factories.clone(), - alice_wallet_backend, - alice_backend, - alice_oms_backend, + alice_connection, database_path.clone(), Duration::from_secs(60), shutdown.to_signal(), @@ -1092,9 +1066,7 @@ fn manage_multiple_transactions() { bob_node_identity.clone(), vec![alice_node_identity.clone()], factories.clone(), - bob_wallet_backend, - bob_backend, - bob_oms_backend, + bob_connection, database_path.clone(), Duration::from_secs(1), shutdown.to_signal(), @@ -1107,9 +1079,7 @@ fn manage_multiple_transactions() { carol_node_identity.clone(), vec![alice_node_identity.clone()], factories.clone(), - carol_wallet_backend, - carol_backend, - carol_oms_backend, + carol_connection, database_path, Duration::from_secs(1), shutdown.to_signal(), @@ -1299,8 +1269,6 @@ fn test_accepting_unknown_tx_id_and_malformed_reply() { let alice_db_name = format!("{}.sqlite3", random::string(8).as_str()); let alice_db_path = format!("{}/{}", path_string, alice_db_name); let connection_alice = run_migration_and_create_sqlite_connection(&alice_db_path).unwrap(); - let alice_backend = TransactionServiceSqliteDatabase::new(connection_alice.clone(), None); - let oms_backend = OutputManagerSqliteDatabase::new(connection_alice, None); let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); @@ -1318,7 +1286,7 @@ fn test_accepting_unknown_tx_id_and_malformed_reply() { _, _, _, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), alice_backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection_alice, None); let mut alice_event_stream = alice_ts.get_event_stream(); @@ -1415,11 +1383,6 @@ fn finalize_tx_with_incorrect_pubkey() { let connection_alice = run_migration_and_create_sqlite_connection(&alice_db_path).unwrap(); let connection_bob = run_migration_and_create_sqlite_connection(&bob_db_path).unwrap(); - let alice_oms_backend = OutputManagerSqliteDatabase::new(connection_alice.clone(), None); - let bob_oms_backend = OutputManagerSqliteDatabase::new(connection_bob.clone(), None); - let alice_backend = TransactionServiceSqliteDatabase::new(connection_alice, None); - let bob_backend = TransactionServiceSqliteDatabase::new(connection_bob, None); - let ( mut alice_ts, _alice_output_manager, @@ -1434,7 +1397,7 @@ fn finalize_tx_with_incorrect_pubkey() { _, _, _, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), alice_backend, alice_oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection_alice, None); let mut alice_event_stream = alice_ts.get_event_stream(); let bob_node_identity = @@ -1453,7 +1416,7 @@ fn finalize_tx_with_incorrect_pubkey() { _, _, _, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), bob_backend, bob_oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection_bob, None); let (_utxo, uo) = make_input(&mut OsRng, MicroTari(250000), &factories.commitment); @@ -1545,10 +1508,6 @@ fn finalize_tx_with_missing_output() { let bob_db_path = format!("{}/{}", path_string, bob_db_name); let connection_alice = run_migration_and_create_sqlite_connection(&alice_db_path).unwrap(); let connection_bob = run_migration_and_create_sqlite_connection(&bob_db_path).unwrap(); - let alice_oms_backend = OutputManagerSqliteDatabase::new(connection_alice.clone(), None); - let bob_oms_backend = OutputManagerSqliteDatabase::new(connection_bob.clone(), None); - let alice_backend = TransactionServiceSqliteDatabase::new(connection_alice, None); - let bob_backend = TransactionServiceSqliteDatabase::new(connection_bob, None); let ( mut alice_ts, @@ -1564,7 +1523,7 @@ fn finalize_tx_with_missing_output() { _, _, _, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), alice_backend, alice_oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection_alice, None); let mut alice_event_stream = alice_ts.get_event_stream(); let bob_node_identity = @@ -1583,7 +1542,7 @@ fn finalize_tx_with_missing_output() { _, _, _, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), bob_backend, bob_oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection_bob, None); let (_utxo, uo) = make_input(&mut OsRng, MicroTari(250000), &factories.commitment); @@ -1710,31 +1669,27 @@ fn discovery_async_return_test() { ); let mut shutdown = Shutdown::new(); - let (carol_wallet_backend, carol_db, carol_oms_db, _, _temp_dir1) = make_wallet_databases(None); + let (carol_connection, _temp_dir1) = make_wallet_database_connection(None); let (_carol_ts, _carol_oms, carol_comms) = setup_transaction_service( &mut runtime, carol_node_identity.clone(), vec![], factories.clone(), - carol_wallet_backend, - carol_db, - carol_oms_db, + carol_connection, db_folder.join("carol"), Duration::from_secs(1), shutdown.to_signal(), ); - let (alice_wallet_backend, alice_db, alice_oms_db, _, _temp_dir2) = make_wallet_databases(None); + let (alice_connection, _temp_dir2) = make_wallet_database_connection(None); let (mut alice_ts, mut alice_oms, alice_comms) = setup_transaction_service( &mut runtime, alice_node_identity, vec![carol_node_identity.clone()], factories.clone(), - alice_wallet_backend, - alice_db, - alice_oms_db, + alice_connection, db_folder.join("alice"), Duration::from_secs(20), shutdown.to_signal(), @@ -1855,7 +1810,8 @@ fn discovery_async_return_test() { fn test_power_mode_updates() { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); - let (_wallet_backend, tx_backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); + let tx_backend = TransactionServiceSqliteDatabase::new(connection.clone(), None); let kernel = KernelBuilder::new() .with_excess(&factories.commitment.zero()) @@ -1936,7 +1892,7 @@ fn test_power_mode_updates() { _, server_node_identity, rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories, tx_backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories, connection, None); runtime .block_on(alice_ts.set_base_node_public_key(server_node_identity.public_key().clone())) @@ -1969,19 +1925,12 @@ fn test_set_num_confirmations() { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); - let db_name = format!("{}.sqlite3", random::string(8).as_str()); - let temp_dir = tempdir().unwrap(); - let db_folder = temp_dir.path().to_str().unwrap().to_string(); - let connection = run_migration_and_create_sqlite_connection(&format!("{}/{}", db_folder, db_name)).unwrap(); - - let backend = TransactionServiceSqliteDatabase::new(connection.clone(), None); - let oms_backend = OutputManagerSqliteDatabase::new(connection, None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let (mut ts, _, _, _, _, _, _, _, _, _shutdown, _, _, _) = setup_transaction_service_no_comms( &mut runtime, factories, - backend, - oms_backend, + connection, Some(TransactionServiceConfig { broadcast_monitoring_timeout: Duration::from_secs(20), chain_monitoring_timeout: Duration::from_secs(20), @@ -2012,13 +1961,7 @@ fn test_transaction_cancellation() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); - let db_name = format!("{}.sqlite3", random::string(8).as_str()); - let temp_dir = tempdir().unwrap(); - let db_folder = temp_dir.path().to_str().unwrap().to_string(); - let connection = run_migration_and_create_sqlite_connection(&format!("{}/{}", db_folder, db_name)).unwrap(); - - let backend = TransactionServiceSqliteDatabase::new(connection.clone(), None); - let oms_backend = OutputManagerSqliteDatabase::new(connection, None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -2037,8 +1980,7 @@ fn test_transaction_cancellation() { ) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), - backend, - oms_backend, + connection, Some(TransactionServiceConfig { broadcast_monitoring_timeout: Duration::from_secs(20), chain_monitoring_timeout: Duration::from_secs(20), @@ -2330,7 +2272,7 @@ fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); - let (_wallet_backend, tx_backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -2346,7 +2288,7 @@ fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { _, _, _, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), tx_backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection, None); let alice_total_available = 250000 * uT; let (_utxo, uo) = make_input(&mut OsRng, alice_total_available, &factories.commitment); @@ -2384,15 +2326,14 @@ fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { }, }; assert_eq!(tx_id, msg_tx_id); - let (_wallet_backend, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); // Test sending the Reply to a receiver with Direct and then with SAF and never both let (_bob_ts, _, bob_outbound_service, _, mut bob_tx_sender, _, _, _, _, _shutdown, _, _, _) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), - backend, - oms_backend, + connection, Some(TransactionServiceConfig { broadcast_monitoring_timeout: Duration::from_secs(20), chain_monitoring_timeout: Duration::from_secs(20), @@ -2427,14 +2368,13 @@ fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { runtime.block_on(async { sleep(Duration::from_secs(5)).await }); assert_eq!(bob_outbound_service.call_count(), 0, "Should be no more calls"); - let (_wallet_backend, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let (_bob2_ts, _, bob2_outbound_service, _, mut bob2_tx_sender, _, _, _, _, _shutdown, _, _, _) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), - backend, - oms_backend, + connection, Some(TransactionServiceConfig { broadcast_monitoring_timeout: Duration::from_secs(20), chain_monitoring_timeout: Duration::from_secs(20), @@ -2570,7 +2510,7 @@ fn test_tx_direct_send_behaviour() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); - let (_wallet_backend, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -2586,7 +2526,7 @@ fn test_tx_direct_send_behaviour() { _, _, _, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection, None); let mut alice_event_stream = alice_ts.get_event_stream(); let (_utxo, uo) = make_input(&mut OsRng, 1000000 * uT, &factories.commitment); @@ -2776,8 +2716,11 @@ tokio::pin!(delay); fn test_restarting_transaction_protocols() { let mut runtime = Runtime::new().unwrap(); let factories = CryptoFactories::default(); - let (_wallet_backend, alice_backend, alice_oms_backend, _, _temp_dir) = make_wallet_databases(None); - let (_, bob_backend, bob_oms_backend, _, _temp_dir2) = make_wallet_databases(None); + let (alice_connection, _temp_dir) = make_wallet_database_connection(None); + let alice_backend = TransactionServiceSqliteDatabase::new(alice_connection.clone(), None); + + let (bob_connection, _temp_dir2) = make_wallet_database_connection(None); + let bob_backend = TransactionServiceSqliteDatabase::new(bob_connection.clone(), None); let base_node_identity = Arc::new(NodeIdentity::random( &mut OsRng, @@ -2894,7 +2837,7 @@ fn test_restarting_transaction_protocols() { // Test that Bob's node restarts the send protocol let (mut bob_ts, _bob_oms, _bob_outbound_service, _, _, mut bob_tx_reply, _, _, _, _shutdown, _, _, _) = - setup_transaction_service_no_comms(&mut runtime, factories.clone(), bob_backend, bob_oms_backend, None); + setup_transaction_service_no_comms(&mut runtime, factories.clone(), bob_connection, None); let mut bob_event_stream = bob_ts.get_event_stream(); runtime @@ -2929,7 +2872,7 @@ fn test_restarting_transaction_protocols() { // Test Alice's node restarts the receive protocol let (mut alice_ts, _alice_oms, _alice_outbound_service, _, _, _, mut alice_tx_finalized, _, _, _shutdown, _, _, _) = - setup_transaction_service_no_comms(&mut runtime, factories, alice_backend, alice_oms_backend, None); + setup_transaction_service_no_comms(&mut runtime, factories, alice_connection, None); let mut alice_event_stream = alice_ts.get_event_stream(); runtime @@ -2976,7 +2919,7 @@ fn test_coinbase_transactions_rejection_same_height() { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -2992,7 +2935,7 @@ fn test_coinbase_transactions_rejection_same_height() { _mock_rpc_server, _server_node_identity, _rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories, backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories, connection, None); let block_height_a = 10; let block_height_b = block_height_a + 1; @@ -3074,7 +3017,7 @@ fn test_coinbase_monitoring_stuck_in_mempool() { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -3090,7 +3033,7 @@ fn test_coinbase_monitoring_stuck_in_mempool() { _mock_rpc_server, server_node_identity, mut rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories, backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories, connection, None); let mut alice_event_stream = alice_ts.get_event_stream(); rpc_service_state.set_response_delay(Some(Duration::from_secs(1))); @@ -3245,7 +3188,7 @@ fn test_coinbase_monitoring_with_base_node_change_and_mined() { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -3261,7 +3204,7 @@ fn test_coinbase_monitoring_with_base_node_change_and_mined() { _mock_rpc_server, server_node_identity, mut rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories, backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories, connection, None); let mut alice_event_stream = alice_ts.get_event_stream(); rpc_service_state.set_response_delay(Some(Duration::from_secs(1))); @@ -3449,7 +3392,7 @@ fn test_coinbase_monitoring_mined_not_synced() { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -3465,7 +3408,7 @@ fn test_coinbase_monitoring_mined_not_synced() { _mock_rpc_server, server_node_identity, mut rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories, backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories, connection, None); let mut alice_event_stream = alice_ts.get_event_stream(); rpc_service_state.set_response_delay(Some(Duration::from_secs(1))); @@ -3620,10 +3563,10 @@ fn test_coinbase_monitoring_mined_not_synced() { fn test_coinbase_transaction_reused_for_same_height() { let factories = CryptoFactories::default(); let mut runtime = Runtime::new().unwrap(); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let (mut tx_service, mut output_service, _, _, _, _, _, _, _, _shutdown, _, _, _) = - setup_transaction_service_no_comms(&mut runtime, factories, backend, oms_backend, None); + setup_transaction_service_no_comms(&mut runtime, factories, connection, None); let blockheight1 = 10; let fees1 = 2000 * uT; @@ -3706,7 +3649,7 @@ fn test_transaction_resending() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); // Setup Alice wallet with no comms stack - let (_, alice_backend, alice_oms_backend, _, _tempdir) = make_wallet_databases(None); + let (connection, _tempdir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -3725,8 +3668,7 @@ fn test_transaction_resending() { ) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), - alice_backend, - alice_oms_backend, + connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -3767,7 +3709,7 @@ fn test_transaction_resending() { } // Setup Bob's wallet with no comms stack - let (_, bob_backend, bob_oms_backend, _, _tempdir) = make_wallet_databases(None); + let (connection, _tempdir) = make_wallet_database_connection(None); let ( _bob_ts, @@ -3786,8 +3728,7 @@ fn test_transaction_resending() { ) = setup_transaction_service_no_comms( &mut runtime, factories, - bob_backend, - bob_oms_backend, + connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -3952,7 +3893,8 @@ fn test_resend_on_startup() { send_count: 1, last_send_timestamp: Some(Utc::now().naive_utc()), }; - let (_, alice_backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); + let alice_backend = TransactionServiceSqliteDatabase::new(connection.clone(), None); alice_backend .write(WriteOperation::Insert(DbKeyValuePair::PendingOutboundTransaction( tx_id, @@ -3964,8 +3906,7 @@ fn test_resend_on_startup() { setup_transaction_service_no_comms( &mut runtime, factories.clone(), - alice_backend, - oms_backend, + connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -3991,7 +3932,8 @@ fn test_resend_on_startup() { outbound_tx.send_count = 1; outbound_tx.last_send_timestamp = Utc::now().naive_utc().checked_sub_signed(ChronoDuration::seconds(20)); - let (_, alice_backend2, oms_backend2, _, _temp_dir2) = make_wallet_databases(None); + let (connection2, _temp_dir2) = make_wallet_database_connection(None); + let alice_backend2 = TransactionServiceSqliteDatabase::new(connection2.clone(), None); alice_backend2 .write(WriteOperation::Insert(DbKeyValuePair::PendingOutboundTransaction( @@ -4004,8 +3946,7 @@ fn test_resend_on_startup() { setup_transaction_service_no_comms( &mut runtime, factories.clone(), - alice_backend2, - oms_backend2, + connection2, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4056,7 +3997,8 @@ fn test_resend_on_startup() { send_count: 0, last_send_timestamp: Some(Utc::now().naive_utc()), }; - let (_, bob_backend, bob_oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (bob_connection, _temp_dir) = make_wallet_database_connection(None); + let bob_backend = TransactionServiceSqliteDatabase::new(bob_connection.clone(), None); bob_backend .write(WriteOperation::Insert(DbKeyValuePair::PendingInboundTransaction( @@ -4069,8 +4011,7 @@ fn test_resend_on_startup() { setup_transaction_service_no_comms( &mut runtime, factories.clone(), - bob_backend, - bob_oms_backend, + bob_connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4093,8 +4034,8 @@ fn test_resend_on_startup() { // Now we do it again with the timestamp prior to the cooldown and see that a message is sent inbound_tx.send_count = 1; inbound_tx.last_send_timestamp = Utc::now().naive_utc().checked_sub_signed(ChronoDuration::seconds(20)); - let (_, bob_backend2, bob_oms_backend2, _, _temp_dir2) = make_wallet_databases(None); - + let (bob_connection2, _temp_dir2) = make_wallet_database_connection(None); + let bob_backend2 = TransactionServiceSqliteDatabase::new(bob_connection2.clone(), None); bob_backend2 .write(WriteOperation::Insert(DbKeyValuePair::PendingInboundTransaction( tx_id, @@ -4106,8 +4047,7 @@ fn test_resend_on_startup() { setup_transaction_service_no_comms( &mut runtime, factories, - bob_backend2, - bob_oms_backend2, + bob_connection2, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4144,7 +4084,7 @@ fn test_replying_to_cancelled_tx() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); // Testing if a Tx Reply is received for a Cancelled Outbound Tx that a Cancelled message is sent back: - let (_, alice_backend, alice_oms_backend, _, _tempdir) = make_wallet_databases(None); + let (alice_connection, _tempdir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -4163,8 +4103,7 @@ fn test_replying_to_cancelled_tx() { ) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), - alice_backend, - alice_oms_backend, + alice_connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4203,7 +4142,7 @@ fn test_replying_to_cancelled_tx() { runtime.block_on(alice_ts.cancel_transaction(tx_id)).unwrap(); // Setup Bob's wallet with no comms stack - let (_, bob_backend, bob_oms_backend, _, _tempdir) = make_wallet_databases(None); + let (bob_connection, _tempdir) = make_wallet_database_connection(None); let ( _bob_ts, @@ -4222,8 +4161,7 @@ fn test_replying_to_cancelled_tx() { ) = setup_transaction_service_no_comms( &mut runtime, factories, - bob_backend, - bob_oms_backend, + bob_connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4276,7 +4214,7 @@ fn test_transaction_timeout_cancellation() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); // Testing if a Tx Reply is received for a Cancelled Outbound Tx that a Cancelled message is sent back: - let (_, alice_backend, alice_oms_backend, _, _tempdir) = make_wallet_databases(None); + let (alice_connection, _tempdir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -4295,8 +4233,7 @@ fn test_transaction_timeout_cancellation() { ) = setup_transaction_service_no_comms( &mut runtime, factories.clone(), - alice_backend, - alice_oms_backend, + alice_connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4404,8 +4341,8 @@ fn test_transaction_timeout_cancellation() { send_count: 1, last_send_timestamp: Some(Utc::now().naive_utc()), }; - let (_, bob_backend, bob_oms_backend, _, _temp_dir) = make_wallet_databases(None); - + let (bob_connection, _temp_dir) = make_wallet_database_connection(None); + let bob_backend = TransactionServiceSqliteDatabase::new(bob_connection.clone(), None); bob_backend .write(WriteOperation::Insert(DbKeyValuePair::PendingOutboundTransaction( tx_id, @@ -4417,8 +4354,7 @@ fn test_transaction_timeout_cancellation() { setup_transaction_service_no_comms( &mut runtime, factories.clone(), - bob_backend, - bob_oms_backend, + bob_connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4445,15 +4381,14 @@ fn test_transaction_timeout_cancellation() { let call = bob_outbound_service.pop_call().unwrap(); let bob_cancelled_message = try_decode_transaction_cancelled_message(call.1.to_vec()).unwrap(); assert_eq!(bob_cancelled_message.tx_id, tx_id); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (carol_connection, _temp_dir) = make_wallet_database_connection(None); // Now to do this for the Receiver let (carol_ts, _, carol_outbound_service, _, mut carol_tx_sender, _, _, _, _, _shutdown, _, _, _) = setup_transaction_service_no_comms( &mut runtime, factories, - backend, - oms_backend, + carol_connection, Some(TransactionServiceConfig { transaction_resend_period: Duration::from_secs(10), resend_response_cooldown: Duration::from_secs(5), @@ -4520,7 +4455,7 @@ fn transaction_service_tx_broadcast() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -4536,16 +4471,16 @@ fn transaction_service_tx_broadcast() { _mock_rpc_server, server_node_identity, rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection, None); let mut alice_event_stream = alice_ts.get_event_stream(); runtime .block_on(alice_ts.set_base_node_public_key(server_node_identity.public_key().clone())) .unwrap(); - let (_, backend2, oms_backend2, _, _temp_dir2) = make_wallet_databases(None); + let (connection2, _temp_dir2) = make_wallet_database_connection(None); let (_bob_ts, _bob_output_manager, bob_outbound_service, _, mut bob_tx_sender, _, _, _, _, _shutdown, _, _, _) = - setup_transaction_service_no_comms(&mut runtime, factories.clone(), backend2, oms_backend2, None); + setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection2, None); let alice_output_value = MicroTari(250000); @@ -4826,8 +4761,8 @@ fn transaction_service_tx_broadcast() { fn broadcast_all_completed_transactions_on_startup() { let mut runtime = Runtime::new().unwrap(); let factories = CryptoFactories::default(); - let (_, db, oms_db, _, _temp_dir) = make_wallet_databases(None); - + let (connection, _temp_dir) = make_wallet_database_connection(None); + let db = TransactionServiceSqliteDatabase::new(connection.clone(), None); let kernel = KernelBuilder::new() .with_excess(&factories.commitment.zero()) .with_signature(&Signature::default()) @@ -4893,7 +4828,7 @@ fn broadcast_all_completed_transactions_on_startup() { .unwrap(); let (mut alice_ts, _, _, _, _, _, _, _, _, _shutdown, _mock_rpc_server, server_node_identity, rpc_service_state) = - setup_transaction_service_no_comms(&mut runtime, factories, db, oms_db, None); + setup_transaction_service_no_comms(&mut runtime, factories, connection, None); rpc_service_state.set_transaction_query_response(TxQueryResponse { location: TxLocation::Mined, @@ -4960,7 +4895,7 @@ fn transaction_service_tx_broadcast_with_base_node_change() { let bob_node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE); - let (_, backend, oms_backend, _, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let ( mut alice_ts, @@ -4976,16 +4911,16 @@ fn transaction_service_tx_broadcast_with_base_node_change() { _mock_rpc_server, server_node_identity, rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection, None); let mut alice_event_stream = alice_ts.get_event_stream(); runtime .block_on(alice_ts.set_base_node_public_key(server_node_identity.public_key().clone())) .unwrap(); - let (_, backend2, oms_backend2, _, _temp_dir2) = make_wallet_databases(None); + let (connection2, _temp_dir2) = make_wallet_database_connection(None); let (_bob_ts, _bob_output_manager, bob_outbound_service, _, mut bob_tx_sender, _, _, _, _, _shutdown, _, _, _) = - setup_transaction_service_no_comms(&mut runtime, factories.clone(), backend2, oms_backend2, None); + setup_transaction_service_no_comms(&mut runtime, factories.clone(), connection2, None); let alice_output_value = MicroTari(250000); @@ -5171,7 +5106,6 @@ fn only_start_one_tx_broadcast_protocol_at_a_time() { let db_path = format!("{}/{}", temp_dir.path().to_str().unwrap(), db_name); let connection = run_migration_and_create_sqlite_connection(&db_path).unwrap(); let backend = TransactionServiceSqliteDatabase::new(connection.clone(), None); - let oms_backend = OutputManagerSqliteDatabase::new(connection, None); let kernel = KernelBuilder::new() .with_excess(&factories.commitment.zero()) @@ -5215,7 +5149,7 @@ fn only_start_one_tx_broadcast_protocol_at_a_time() { .unwrap(); let (mut alice_ts, _, _, _, _, _, _, _, _, _shutdown, _mock_rpc_server, server_node_identity, rpc_service_state) = - setup_transaction_service_no_comms(&mut runtime, factories, backend, oms_backend, None); + setup_transaction_service_no_comms(&mut runtime, factories, connection, None); runtime .block_on(alice_ts.set_base_node_public_key(server_node_identity.public_key().clone())) @@ -5239,7 +5173,6 @@ fn dont_broadcast_invalid_transactions() { let db_path = format!("{}/{}", temp_dir.path().to_str().unwrap(), db_name); let connection = run_migration_and_create_sqlite_connection(&db_path).unwrap(); let backend = TransactionServiceSqliteDatabase::new(connection.clone(), None); - let oms_backend = OutputManagerSqliteDatabase::new(connection, None); let kernel = KernelBuilder::new() .with_excess(&factories.commitment.zero()) @@ -5283,7 +5216,7 @@ fn dont_broadcast_invalid_transactions() { .unwrap(); let (mut alice_ts, _, _, _, _, _, _, _, _, _shutdown, _mock_rpc_server, server_node_identity, rpc_service_state) = - setup_transaction_service_no_comms(&mut runtime, factories, backend, oms_backend, None); + setup_transaction_service_no_comms(&mut runtime, factories, connection, None); runtime .block_on(alice_ts.set_base_node_public_key(server_node_identity.public_key().clone())) @@ -5306,9 +5239,8 @@ fn start_validation_protocol_then_broadcast_protocol_change_base_node() { let db_path = format!("{}/{}", temp_dir.path().to_str().unwrap(), db_name); let connection = run_migration_and_create_sqlite_connection(&db_path).unwrap(); let tx_backend = TransactionServiceSqliteDatabase::new(connection.clone(), None); - let oms_backend = OutputManagerSqliteDatabase::new(connection, None); - let db = TransactionDatabase::new(tx_backend.clone()); + let db = TransactionDatabase::new(tx_backend); runtime.block_on(add_transaction_to_database( 1, @@ -5370,7 +5302,7 @@ fn start_validation_protocol_then_broadcast_protocol_change_base_node() { _mock_rpc_server, server_node_identity, mut rpc_service_state, - ) = setup_transaction_service_no_comms(&mut runtime, factories, tx_backend, oms_backend, None); + ) = setup_transaction_service_no_comms(&mut runtime, factories, connection, None); rpc_service_state.set_transaction_query_response(TxQueryResponse { location: TxLocation::Mined, diff --git a/base_layer/wallet/tests/wallet/mod.rs b/base_layer/wallet/tests/wallet/mod.rs index 671d3db23c..3fc3e06086 100644 --- a/base_layer/wallet/tests/wallet/mod.rs +++ b/base_layer/wallet/tests/wallet/mod.rs @@ -57,8 +57,9 @@ use tari_p2p::{initialization::CommsConfig, transport::TransportType, Network, D use tari_shutdown::{Shutdown, ShutdownSignal}; use tari_test_utils::random; use tari_wallet::{ - contacts_service::storage::database::Contact, + contacts_service::storage::{database::Contact, sqlite_db::ContactsServiceSqliteDatabase}, error::{WalletError, WalletStorageError}, + output_manager_service::storage::sqlite_db::OutputManagerSqliteDatabase, storage::{ database::{DbKeyValuePair, WalletBackend, WalletDatabase, WriteOperation}, sqlite_db::WalletSqliteDatabase, @@ -68,8 +69,12 @@ use tari_wallet::{ run_migration_and_create_sqlite_connection, }, }, - test_utils::make_wallet_databases, - transaction_service::{config::TransactionServiceConfig, handle::TransactionEvent}, + test_utils::make_wallet_database_connection, + transaction_service::{ + config::TransactionServiceConfig, + handle::TransactionEvent, + storage::sqlite_db::TransactionServiceSqliteDatabase, + }, Wallet, WalletConfig, WalletSqlite, @@ -679,7 +684,7 @@ async fn test_import_utxo() { PeerFeatures::COMMUNICATION_NODE, ); let temp_dir = tempdir().unwrap(); - let (wallet_backend, tx_backend, oms_backend, contacts_backend, _temp_dir) = make_wallet_databases(None); + let (connection, _temp_dir) = make_wallet_database_connection(None); let comms_config = CommsConfig { network: Network::Weatherwax, node_identity: Arc::new(alice_identity.clone()), @@ -717,10 +722,10 @@ async fn test_import_utxo() { ); let mut alice_wallet = Wallet::start( config, - WalletDatabase::new(wallet_backend), - tx_backend, - oms_backend, - contacts_backend, + WalletDatabase::new(WalletSqliteDatabase::new(connection.clone(), None).unwrap()), + TransactionServiceSqliteDatabase::new(connection.clone(), None), + OutputManagerSqliteDatabase::new(connection.clone(), None), + ContactsServiceSqliteDatabase::new(connection), shutdown.to_signal(), None, ) diff --git a/base_layer/wallet_ffi/src/callback_handler.rs b/base_layer/wallet_ffi/src/callback_handler.rs index c5af01c7b6..af90c0bfaf 100644 --- a/base_layer/wallet_ffi/src/callback_handler.rs +++ b/base_layer/wallet_ffi/src/callback_handler.rs @@ -598,7 +598,7 @@ mod test { use tari_shutdown::Shutdown; use tari_wallet::{ output_manager_service::{handle::OutputManagerEvent, TxoValidationType}, - test_utils::make_wallet_databases, + test_utils::make_wallet_database_connection, transaction_service::{ handle::TransactionEvent, storage::{ @@ -610,6 +610,7 @@ mod test { TransactionDirection, TransactionStatus, }, + sqlite_db::TransactionServiceSqliteDatabase, }, }, }; @@ -770,8 +771,8 @@ mod test { fn test_callback_handler() { let runtime = Runtime::new().unwrap(); - let (_wallet_backend, backend, _oms_backend, _, _tempdir) = make_wallet_databases(None); - let db = TransactionDatabase::new(backend); + let (connection, _tempdir) = make_wallet_database_connection(None); + let db = TransactionDatabase::new(TransactionServiceSqliteDatabase::new(connection, None)); let rtp = ReceiverTransactionProtocol::new_placeholder(); let inbound_tx = InboundTransaction::new( 1u64,