diff --git a/base_layer/wallet/src/utxo_scanner_service/error.rs b/base_layer/wallet/src/utxo_scanner_service/error.rs index 781e7c0c69..e2ae7db303 100644 --- a/base_layer/wallet/src/utxo_scanner_service/error.rs +++ b/base_layer/wallet/src/utxo_scanner_service/error.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::{error::WalletStorageError, output_manager_service::error::OutputManagerError}; +use serde_json::Error as SerdeJsonError; use tari_comms::{connectivity::ConnectivityError, protocol::rpc::RpcError}; use tari_crypto::tari_utilities::hex::HexError; use tari_service_framework::reply_channel::TransportChannelError; @@ -52,4 +53,6 @@ pub enum UtxoScannerError { UtxoImportError(String), #[error("Transport channel error: `{0}`")] TransportChannelError(#[from] TransportChannelError), + #[error("Serde json error: `{0}`")] + SerdeJsonError(#[from] SerdeJsonError), } diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs index 774cd83edc..17e129d662 100644 --- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs +++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs @@ -37,9 +37,9 @@ use crate::{ use chrono::Utc; use futures::{pin_mut, FutureExt, StreamExt}; use log::*; +use serde::{Deserialize, Serialize}; use std::{ convert::TryFrom, - str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -64,20 +64,17 @@ use tari_core::{ transactions::{ tari_amount::MicroTari, transaction::{TransactionOutput, UnblindedOutput}, - types::CryptoFactories, + types::{CryptoFactories, HashOutput}, }, }; -use tari_crypto::tari_utilities::hex::*; use tari_service_framework::{reply_channel, reply_channel::SenderService}; use tari_shutdown::ShutdownSignal; use tokio::{sync::broadcast, time::delay_for}; pub const LOG_TARGET: &str = "wallet::utxo_scanning"; -const TOTAL_AMOUNT_KEY: &str = "/total-amount"; -const NUM_UTXOS_KEY: &str = "/num-utxos"; -pub const UTXO_INDEX_KEY: &str = "/utxos-index"; -const HEIGHT_HASH_KEY: &str = "/height-hash"; +pub const RECOVERY_KEY: &str = "recovery_data"; +const SCANNING_KEY: &str = "scanning_data"; #[derive(Debug, Clone, PartialEq)] pub enum UtxoScannerMode { @@ -228,19 +225,15 @@ where TBackend: WalletBackend + 'static final_utxo_pos: u64, elapsed: Duration, ) -> Result<(), UtxoScannerError> { - let num_recovered = self.get_metadata(ScanningMetadataKey::NumUtxos).await?.unwrap_or(0); - let total_amount = self - .get_metadata(ScanningMetadataKey::TotalAmount) - .await? - .unwrap_or_else(|| 0.into()); + let metadata = self.get_metadata().await?.unwrap_or_default(); self.publish_event(UtxoScannerEvent::Progress { current_block: final_utxo_pos, current_chain_height: final_utxo_pos, }); self.publish_event(UtxoScannerEvent::Completed { number_scanned: total_scanned, - number_received: num_recovered, - value_received: total_amount, + number_received: metadata.number_of_utxos, + value_received: metadata.total_amount, time_taken: elapsed, }); @@ -333,33 +326,22 @@ where TBackend: WalletBackend + 'static } async fn get_start_utxo_mmr_pos(&self, client: &mut BaseNodeSyncRpcClient) -> Result { - let previous_scan_hash = self - .get_metadata::(ScanningMetadataKey::HeightHash) - .await - .ok() - .flatten(); - let previous_utxo_index = self - .get_metadata::(ScanningMetadataKey::UtxoIndex) - .await - .ok() - .flatten(); - - if previous_utxo_index.is_none() || previous_scan_hash.is_none() { + let metadata = self.get_metadata().await?.unwrap_or_default(); + if metadata.height_hash.is_empty() { // Set a value in here so that if the recovery fails on the genesis block the client will know a // recover was started. Important on Console wallet that otherwise makes this decision based on the // presence of the data file - self.set_metadata(ScanningMetadataKey::UtxoIndex, 0u64).await?; + self.set_metadata(metadata).await?; return Ok(0); } // if it's none, we return 0 above. - let hash: Vec = from_hex(&previous_scan_hash.unwrap())?; let request = FindChainSplitRequest { - block_hashes: vec![hash], + block_hashes: vec![metadata.height_hash], header_count: 1, }; // this returns the index of the vec of hashes we sent it, that is the last hash it knows of. if client.find_chain_split(request).await.is_ok() { - Ok(previous_utxo_index.unwrap()) + Ok(metadata.utxo_index) } else { // The node does not know of the last hash we scanned, thus we had a chain split. // We now start at 0 again. @@ -450,27 +432,13 @@ where TBackend: WalletBackend + 'static num_recovered: u64, end_header_hash: Vec, ) -> Result<(), UtxoScannerError> { - self.set_metadata(ScanningMetadataKey::HeightHash, end_header_hash.to_hex()) - .await?; - let current_num_utxos = self.get_metadata(ScanningMetadataKey::NumUtxos).await?.unwrap_or(0u64); - self.set_metadata( - ScanningMetadataKey::NumUtxos, - (current_num_utxos + num_recovered).to_string(), - ) - .await?; - - let current_total_amount = self - .get_metadata::(ScanningMetadataKey::TotalAmount) - .await? - .unwrap_or_else(|| 0.into()); + let mut meta_data = self.get_metadata().await?.unwrap_or_default(); + meta_data.height_hash = end_header_hash; + meta_data.number_of_utxos += num_recovered; + meta_data.utxo_index = last_utxo_index; + meta_data.total_amount += total_amount; - self.set_metadata(ScanningMetadataKey::UtxoIndex, last_utxo_index) - .await?; - self.set_metadata( - ScanningMetadataKey::TotalAmount, - (current_total_amount + total_amount).as_u64().to_string(), - ) - .await?; + self.set_metadata(meta_data).await?; Ok(()) } @@ -534,54 +502,30 @@ where TBackend: WalletBackend + 'static fn get_db_mode_key(&self) -> String { match self.mode { - UtxoScannerMode::Recovery => "recovery".to_owned(), - UtxoScannerMode::Scanning => "scanning".to_owned(), + UtxoScannerMode::Recovery => RECOVERY_KEY.to_owned(), + UtxoScannerMode::Scanning => SCANNING_KEY.to_owned(), } } - async fn set_metadata(&self, key: ScanningMetadataKey, value: T) -> Result<(), UtxoScannerError> { - let mut total_key = self.get_db_mode_key(); - total_key.push_str(key.as_key_str()); - self.resources - .db - .set_client_key_value(total_key, value.to_string()) - .await?; + async fn set_metadata(&self, data: ScanningMetadata) -> Result<(), UtxoScannerError> { + let total_key = self.get_db_mode_key(); + let db_value = serde_json::to_string(&data)?; + self.resources.db.set_client_key_value(total_key, db_value).await?; Ok(()) } - async fn get_metadata(&self, key: ScanningMetadataKey) -> Result, UtxoScannerError> - where - T: FromStr, - T::Err: ToString, - { - let mut total_key = self.get_db_mode_key(); - total_key.push_str(key.as_key_str()); - let value = self.resources.db.get_client_key_from_str(total_key).await?; - Ok(value) + async fn get_metadata(&self) -> Result, UtxoScannerError> { + let total_key = self.get_db_mode_key(); + let value: Option = self.resources.db.get_client_key_from_str(total_key).await?; + match value { + None => Ok(None), + Some(v) => Ok(serde_json::from_str(&v)?), + } } async fn clear_db(&self) -> Result<(), UtxoScannerError> { let total_key = self.get_db_mode_key(); - let _ = self - .resources - .db - .clear_client_value(total_key.clone() + ScanningMetadataKey::HeightHash.as_key_str()) - .await?; - let _ = self - .resources - .db - .clear_client_value(total_key.clone() + ScanningMetadataKey::NumUtxos.as_key_str()) - .await?; - let _ = self - .resources - .db - .clear_client_value(total_key.clone() + ScanningMetadataKey::TotalAmount.as_key_str()) - .await?; - let _ = self - .resources - .db - .clear_client_value(total_key + ScanningMetadataKey::UtxoIndex.as_key_str()) - .await?; + let _ = self.resources.db.clear_client_value(total_key).await?; Ok(()) } @@ -838,22 +782,10 @@ fn convert_response_to_unblinded_outputs( Ok((outputs, current_utxo_index)) } -#[derive(Debug, Clone)] -enum ScanningMetadataKey { - TotalAmount, - NumUtxos, - UtxoIndex, - HeightHash, -} - -impl ScanningMetadataKey { - pub fn as_key_str(&self) -> &'static str { - use ScanningMetadataKey::*; - match self { - TotalAmount => TOTAL_AMOUNT_KEY, - NumUtxos => NUM_UTXOS_KEY, - UtxoIndex => UTXO_INDEX_KEY, - HeightHash => HEIGHT_HASH_KEY, - } - } +#[derive(Default, Serialize, Deserialize)] +struct ScanningMetadata { + pub total_amount: MicroTari, + pub number_of_utxos: u64, + pub utxo_index: u64, + pub height_hash: HashOutput, } diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index 28d015e108..5ed26f536d 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -464,10 +464,8 @@ where /// Utility function to find out if there is data in the database indicating that there is an incomplete recovery /// process in progress pub async fn is_recovery_in_progress(&self) -> Result { - use crate::utxo_scanner_service::utxo_scanning::UTXO_INDEX_KEY; - let mut key = "recovery".to_owned(); - key.push_str(&UTXO_INDEX_KEY.to_string()); - Ok(self.db.get_client_key_value(key).await?.is_some()) + use crate::utxo_scanner_service::utxo_scanning::RECOVERY_KEY; + Ok(self.db.get_client_key_value(RECOVERY_KEY.to_string()).await?.is_some()) } }