Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] Cleanup UTXO scanner database operations #3040

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions base_layer/wallet/src/utxo_scanner_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{error::WalletStorageError, output_manager_service::error::OutputManagerError};
use serde_json::Error as SerdeJsonError;
use tari_comms::{connectivity::ConnectivityError, protocol::rpc::RpcError};
use tari_crypto::tari_utilities::hex::HexError;
use tari_service_framework::reply_channel::TransportChannelError;
Expand Down Expand Up @@ -52,4 +53,6 @@ pub enum UtxoScannerError {
UtxoImportError(String),
#[error("Transport channel error: `{0}`")]
TransportChannelError(#[from] TransportChannelError),
#[error("Serde json error: `{0}`")]
SerdeJsonError(#[from] SerdeJsonError),
}
144 changes: 38 additions & 106 deletions base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ use crate::{
use chrono::Utc;
use futures::{pin_mut, FutureExt, StreamExt};
use log::*;
use serde::{Deserialize, Serialize};
use std::{
convert::TryFrom,
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Expand All @@ -64,20 +64,17 @@ use tari_core::{
transactions::{
tari_amount::MicroTari,
transaction::{TransactionOutput, UnblindedOutput},
types::CryptoFactories,
types::{CryptoFactories, HashOutput},
},
};
use tari_crypto::tari_utilities::hex::*;
use tari_service_framework::{reply_channel, reply_channel::SenderService};
use tari_shutdown::ShutdownSignal;
use tokio::{sync::broadcast, time::delay_for};

pub const LOG_TARGET: &str = "wallet::utxo_scanning";

const TOTAL_AMOUNT_KEY: &str = "/total-amount";
const NUM_UTXOS_KEY: &str = "/num-utxos";
pub const UTXO_INDEX_KEY: &str = "/utxos-index";
const HEIGHT_HASH_KEY: &str = "/height-hash";
pub const RECOVERY_KEY: &str = "recovery_data";
const SCANNING_KEY: &str = "scanning_data";

#[derive(Debug, Clone, PartialEq)]
pub enum UtxoScannerMode {
Expand Down Expand Up @@ -228,19 +225,15 @@ where TBackend: WalletBackend + 'static
final_utxo_pos: u64,
elapsed: Duration,
) -> Result<(), UtxoScannerError> {
let num_recovered = self.get_metadata(ScanningMetadataKey::NumUtxos).await?.unwrap_or(0);
let total_amount = self
.get_metadata(ScanningMetadataKey::TotalAmount)
.await?
.unwrap_or_else(|| 0.into());
let metadata = self.get_metadata().await?.unwrap_or_default();
self.publish_event(UtxoScannerEvent::Progress {
current_block: final_utxo_pos,
current_chain_height: final_utxo_pos,
});
self.publish_event(UtxoScannerEvent::Completed {
number_scanned: total_scanned,
number_received: num_recovered,
value_received: total_amount,
number_received: metadata.number_of_utxos,
value_received: metadata.total_amount,
time_taken: elapsed,
});

Expand Down Expand Up @@ -333,33 +326,22 @@ where TBackend: WalletBackend + 'static
}

async fn get_start_utxo_mmr_pos(&self, client: &mut BaseNodeSyncRpcClient) -> Result<u64, UtxoScannerError> {
let previous_scan_hash = self
.get_metadata::<String>(ScanningMetadataKey::HeightHash)
.await
.ok()
.flatten();
let previous_utxo_index = self
.get_metadata::<u64>(ScanningMetadataKey::UtxoIndex)
.await
.ok()
.flatten();

if previous_utxo_index.is_none() || previous_scan_hash.is_none() {
let metadata = self.get_metadata().await?.unwrap_or_default();
if metadata.height_hash.is_empty() {
// Set a value in here so that if the recovery fails on the genesis block the client will know a
// recover was started. Important on Console wallet that otherwise makes this decision based on the
// presence of the data file
self.set_metadata(ScanningMetadataKey::UtxoIndex, 0u64).await?;
self.set_metadata(metadata).await?;
return Ok(0);
}
// if it's none, we return 0 above.
let hash: Vec<u8> = from_hex(&previous_scan_hash.unwrap())?;
let request = FindChainSplitRequest {
block_hashes: vec![hash],
block_hashes: vec![metadata.height_hash],
header_count: 1,
};
// this returns the index of the vec of hashes we sent it, that is the last hash it knows of.
if client.find_chain_split(request).await.is_ok() {
Ok(previous_utxo_index.unwrap())
Ok(metadata.utxo_index)
} else {
// The node does not know of the last hash we scanned, thus we had a chain split.
// We now start at 0 again.
Expand Down Expand Up @@ -450,27 +432,13 @@ where TBackend: WalletBackend + 'static
num_recovered: u64,
end_header_hash: Vec<u8>,
) -> Result<(), UtxoScannerError> {
self.set_metadata(ScanningMetadataKey::HeightHash, end_header_hash.to_hex())
.await?;
let current_num_utxos = self.get_metadata(ScanningMetadataKey::NumUtxos).await?.unwrap_or(0u64);
self.set_metadata(
ScanningMetadataKey::NumUtxos,
(current_num_utxos + num_recovered).to_string(),
)
.await?;

let current_total_amount = self
.get_metadata::<MicroTari>(ScanningMetadataKey::TotalAmount)
.await?
.unwrap_or_else(|| 0.into());
let mut meta_data = self.get_metadata().await?.unwrap_or_default();
meta_data.height_hash = end_header_hash;
meta_data.number_of_utxos += num_recovered;
meta_data.utxo_index = last_utxo_index;
meta_data.total_amount += total_amount;

self.set_metadata(ScanningMetadataKey::UtxoIndex, last_utxo_index)
.await?;
self.set_metadata(
ScanningMetadataKey::TotalAmount,
(current_total_amount + total_amount).as_u64().to_string(),
)
.await?;
self.set_metadata(meta_data).await?;
Ok(())
}

Expand Down Expand Up @@ -534,54 +502,30 @@ where TBackend: WalletBackend + 'static

fn get_db_mode_key(&self) -> String {
match self.mode {
UtxoScannerMode::Recovery => "recovery".to_owned(),
UtxoScannerMode::Scanning => "scanning".to_owned(),
UtxoScannerMode::Recovery => RECOVERY_KEY.to_owned(),
UtxoScannerMode::Scanning => SCANNING_KEY.to_owned(),
}
}

async fn set_metadata<T: ToString>(&self, key: ScanningMetadataKey, value: T) -> Result<(), UtxoScannerError> {
let mut total_key = self.get_db_mode_key();
total_key.push_str(key.as_key_str());
self.resources
.db
.set_client_key_value(total_key, value.to_string())
.await?;
async fn set_metadata(&self, data: ScanningMetadata) -> Result<(), UtxoScannerError> {
let total_key = self.get_db_mode_key();
let db_value = serde_json::to_string(&data)?;
self.resources.db.set_client_key_value(total_key, db_value).await?;
Ok(())
}

async fn get_metadata<T>(&self, key: ScanningMetadataKey) -> Result<Option<T>, UtxoScannerError>
where
T: FromStr,
T::Err: ToString,
{
let mut total_key = self.get_db_mode_key();
total_key.push_str(key.as_key_str());
let value = self.resources.db.get_client_key_from_str(total_key).await?;
Ok(value)
async fn get_metadata(&self) -> Result<Option<ScanningMetadata>, UtxoScannerError> {
let total_key = self.get_db_mode_key();
let value: Option<String> = self.resources.db.get_client_key_from_str(total_key).await?;
match value {
None => Ok(None),
Some(v) => Ok(serde_json::from_str(&v)?),
}
}

async fn clear_db(&self) -> Result<(), UtxoScannerError> {
let total_key = self.get_db_mode_key();
let _ = self
.resources
.db
.clear_client_value(total_key.clone() + ScanningMetadataKey::HeightHash.as_key_str())
.await?;
let _ = self
.resources
.db
.clear_client_value(total_key.clone() + ScanningMetadataKey::NumUtxos.as_key_str())
.await?;
let _ = self
.resources
.db
.clear_client_value(total_key.clone() + ScanningMetadataKey::TotalAmount.as_key_str())
.await?;
let _ = self
.resources
.db
.clear_client_value(total_key + ScanningMetadataKey::UtxoIndex.as_key_str())
.await?;
let _ = self.resources.db.clear_client_value(total_key).await?;
Ok(())
}

Expand Down Expand Up @@ -838,22 +782,10 @@ fn convert_response_to_unblinded_outputs(
Ok((outputs, current_utxo_index))
}

#[derive(Debug, Clone)]
enum ScanningMetadataKey {
TotalAmount,
NumUtxos,
UtxoIndex,
HeightHash,
}

impl ScanningMetadataKey {
pub fn as_key_str(&self) -> &'static str {
use ScanningMetadataKey::*;
match self {
TotalAmount => TOTAL_AMOUNT_KEY,
NumUtxos => NUM_UTXOS_KEY,
UtxoIndex => UTXO_INDEX_KEY,
HeightHash => HEIGHT_HASH_KEY,
}
}
#[derive(Default, Serialize, Deserialize)]
struct ScanningMetadata {
pub total_amount: MicroTari,
pub number_of_utxos: u64,
pub utxo_index: u64,
pub height_hash: HashOutput,
}
6 changes: 2 additions & 4 deletions base_layer/wallet/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,10 +464,8 @@ where
/// Utility function to find out if there is data in the database indicating that there is an incomplete recovery
/// process in progress
pub async fn is_recovery_in_progress(&self) -> Result<bool, WalletError> {
use crate::utxo_scanner_service::utxo_scanning::UTXO_INDEX_KEY;
let mut key = "recovery".to_owned();
key.push_str(&UTXO_INDEX_KEY.to_string());
Ok(self.db.get_client_key_value(key).await?.is_some())
use crate::utxo_scanner_service::utxo_scanning::RECOVERY_KEY;
Ok(self.db.get_client_key_value(RECOVERY_KEY.to_string()).await?.is_some())
}
}

Expand Down