Skip to content

Commit

Permalink
refactor: clean up the db calls to make them safer (#3040)
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Jul 4, 2021
2 parents 2c9f699 + d76768d commit 5116bde
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 110 deletions.
3 changes: 3 additions & 0 deletions base_layer/wallet/src/utxo_scanner_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{error::WalletStorageError, output_manager_service::error::OutputManagerError};
use serde_json::Error as SerdeJsonError;
use tari_comms::{connectivity::ConnectivityError, protocol::rpc::RpcError};
use tari_crypto::tari_utilities::hex::HexError;
use tari_service_framework::reply_channel::TransportChannelError;
Expand Down Expand Up @@ -52,4 +53,6 @@ pub enum UtxoScannerError {
UtxoImportError(String),
#[error("Transport channel error: `{0}`")]
TransportChannelError(#[from] TransportChannelError),
#[error("Serde json error: `{0}`")]
SerdeJsonError(#[from] SerdeJsonError),
}
144 changes: 38 additions & 106 deletions base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ use crate::{
use chrono::Utc;
use futures::{pin_mut, FutureExt, StreamExt};
use log::*;
use serde::{Deserialize, Serialize};
use std::{
convert::TryFrom,
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Expand All @@ -64,20 +64,17 @@ use tari_core::{
transactions::{
tari_amount::MicroTari,
transaction::{TransactionOutput, UnblindedOutput},
types::CryptoFactories,
types::{CryptoFactories, HashOutput},
},
};
use tari_crypto::tari_utilities::hex::*;
use tari_service_framework::{reply_channel, reply_channel::SenderService};
use tari_shutdown::ShutdownSignal;
use tokio::{sync::broadcast, time::delay_for};

pub const LOG_TARGET: &str = "wallet::utxo_scanning";

const TOTAL_AMOUNT_KEY: &str = "/total-amount";
const NUM_UTXOS_KEY: &str = "/num-utxos";
pub const UTXO_INDEX_KEY: &str = "/utxos-index";
const HEIGHT_HASH_KEY: &str = "/height-hash";
pub const RECOVERY_KEY: &str = "recovery_data";
const SCANNING_KEY: &str = "scanning_data";

#[derive(Debug, Clone, PartialEq)]
pub enum UtxoScannerMode {
Expand Down Expand Up @@ -228,19 +225,15 @@ where TBackend: WalletBackend + 'static
final_utxo_pos: u64,
elapsed: Duration,
) -> Result<(), UtxoScannerError> {
let num_recovered = self.get_metadata(ScanningMetadataKey::NumUtxos).await?.unwrap_or(0);
let total_amount = self
.get_metadata(ScanningMetadataKey::TotalAmount)
.await?
.unwrap_or_else(|| 0.into());
let metadata = self.get_metadata().await?.unwrap_or_default();
self.publish_event(UtxoScannerEvent::Progress {
current_block: final_utxo_pos,
current_chain_height: final_utxo_pos,
});
self.publish_event(UtxoScannerEvent::Completed {
number_scanned: total_scanned,
number_received: num_recovered,
value_received: total_amount,
number_received: metadata.number_of_utxos,
value_received: metadata.total_amount,
time_taken: elapsed,
});

Expand Down Expand Up @@ -333,33 +326,22 @@ where TBackend: WalletBackend + 'static
}

async fn get_start_utxo_mmr_pos(&self, client: &mut BaseNodeSyncRpcClient) -> Result<u64, UtxoScannerError> {
let previous_scan_hash = self
.get_metadata::<String>(ScanningMetadataKey::HeightHash)
.await
.ok()
.flatten();
let previous_utxo_index = self
.get_metadata::<u64>(ScanningMetadataKey::UtxoIndex)
.await
.ok()
.flatten();

if previous_utxo_index.is_none() || previous_scan_hash.is_none() {
let metadata = self.get_metadata().await?.unwrap_or_default();
if metadata.height_hash.is_empty() {
// Set a value in here so that if the recovery fails on the genesis block the client will know a
// recover was started. Important on Console wallet that otherwise makes this decision based on the
// presence of the data file
self.set_metadata(ScanningMetadataKey::UtxoIndex, 0u64).await?;
self.set_metadata(metadata).await?;
return Ok(0);
}
// if it's none, we return 0 above.
let hash: Vec<u8> = from_hex(&previous_scan_hash.unwrap())?;
let request = FindChainSplitRequest {
block_hashes: vec![hash],
block_hashes: vec![metadata.height_hash],
header_count: 1,
};
// this returns the index of the vec of hashes we sent it, that is the last hash it knows of.
if client.find_chain_split(request).await.is_ok() {
Ok(previous_utxo_index.unwrap())
Ok(metadata.utxo_index)
} else {
// The node does not know of the last hash we scanned, thus we had a chain split.
// We now start at 0 again.
Expand Down Expand Up @@ -450,27 +432,13 @@ where TBackend: WalletBackend + 'static
num_recovered: u64,
end_header_hash: Vec<u8>,
) -> Result<(), UtxoScannerError> {
self.set_metadata(ScanningMetadataKey::HeightHash, end_header_hash.to_hex())
.await?;
let current_num_utxos = self.get_metadata(ScanningMetadataKey::NumUtxos).await?.unwrap_or(0u64);
self.set_metadata(
ScanningMetadataKey::NumUtxos,
(current_num_utxos + num_recovered).to_string(),
)
.await?;

let current_total_amount = self
.get_metadata::<MicroTari>(ScanningMetadataKey::TotalAmount)
.await?
.unwrap_or_else(|| 0.into());
let mut meta_data = self.get_metadata().await?.unwrap_or_default();
meta_data.height_hash = end_header_hash;
meta_data.number_of_utxos += num_recovered;
meta_data.utxo_index = last_utxo_index;
meta_data.total_amount += total_amount;

self.set_metadata(ScanningMetadataKey::UtxoIndex, last_utxo_index)
.await?;
self.set_metadata(
ScanningMetadataKey::TotalAmount,
(current_total_amount + total_amount).as_u64().to_string(),
)
.await?;
self.set_metadata(meta_data).await?;
Ok(())
}

Expand Down Expand Up @@ -534,54 +502,30 @@ where TBackend: WalletBackend + 'static

fn get_db_mode_key(&self) -> String {
match self.mode {
UtxoScannerMode::Recovery => "recovery".to_owned(),
UtxoScannerMode::Scanning => "scanning".to_owned(),
UtxoScannerMode::Recovery => RECOVERY_KEY.to_owned(),
UtxoScannerMode::Scanning => SCANNING_KEY.to_owned(),
}
}

async fn set_metadata<T: ToString>(&self, key: ScanningMetadataKey, value: T) -> Result<(), UtxoScannerError> {
let mut total_key = self.get_db_mode_key();
total_key.push_str(key.as_key_str());
self.resources
.db
.set_client_key_value(total_key, value.to_string())
.await?;
async fn set_metadata(&self, data: ScanningMetadata) -> Result<(), UtxoScannerError> {
let total_key = self.get_db_mode_key();
let db_value = serde_json::to_string(&data)?;
self.resources.db.set_client_key_value(total_key, db_value).await?;
Ok(())
}

async fn get_metadata<T>(&self, key: ScanningMetadataKey) -> Result<Option<T>, UtxoScannerError>
where
T: FromStr,
T::Err: ToString,
{
let mut total_key = self.get_db_mode_key();
total_key.push_str(key.as_key_str());
let value = self.resources.db.get_client_key_from_str(total_key).await?;
Ok(value)
async fn get_metadata(&self) -> Result<Option<ScanningMetadata>, UtxoScannerError> {
let total_key = self.get_db_mode_key();
let value: Option<String> = self.resources.db.get_client_key_from_str(total_key).await?;
match value {
None => Ok(None),
Some(v) => Ok(serde_json::from_str(&v)?),
}
}

async fn clear_db(&self) -> Result<(), UtxoScannerError> {
let total_key = self.get_db_mode_key();
let _ = self
.resources
.db
.clear_client_value(total_key.clone() + ScanningMetadataKey::HeightHash.as_key_str())
.await?;
let _ = self
.resources
.db
.clear_client_value(total_key.clone() + ScanningMetadataKey::NumUtxos.as_key_str())
.await?;
let _ = self
.resources
.db
.clear_client_value(total_key.clone() + ScanningMetadataKey::TotalAmount.as_key_str())
.await?;
let _ = self
.resources
.db
.clear_client_value(total_key + ScanningMetadataKey::UtxoIndex.as_key_str())
.await?;
let _ = self.resources.db.clear_client_value(total_key).await?;
Ok(())
}

Expand Down Expand Up @@ -838,22 +782,10 @@ fn convert_response_to_unblinded_outputs(
Ok((outputs, current_utxo_index))
}

#[derive(Debug, Clone)]
enum ScanningMetadataKey {
TotalAmount,
NumUtxos,
UtxoIndex,
HeightHash,
}

impl ScanningMetadataKey {
pub fn as_key_str(&self) -> &'static str {
use ScanningMetadataKey::*;
match self {
TotalAmount => TOTAL_AMOUNT_KEY,
NumUtxos => NUM_UTXOS_KEY,
UtxoIndex => UTXO_INDEX_KEY,
HeightHash => HEIGHT_HASH_KEY,
}
}
#[derive(Default, Serialize, Deserialize)]
struct ScanningMetadata {
pub total_amount: MicroTari,
pub number_of_utxos: u64,
pub utxo_index: u64,
pub height_hash: HashOutput,
}
6 changes: 2 additions & 4 deletions base_layer/wallet/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,10 +464,8 @@ where
/// Utility function to find out if there is data in the database indicating that there is an incomplete recovery
/// process in progress
pub async fn is_recovery_in_progress(&self) -> Result<bool, WalletError> {
use crate::utxo_scanner_service::utxo_scanning::UTXO_INDEX_KEY;
let mut key = "recovery".to_owned();
key.push_str(&UTXO_INDEX_KEY.to_string());
Ok(self.db.get_client_key_value(key).await?.is_some())
use crate::utxo_scanner_service::utxo_scanning::RECOVERY_KEY;
Ok(self.db.get_client_key_value(RECOVERY_KEY.to_string()).await?.is_some())
}
}

Expand Down

0 comments on commit 5116bde

Please sign in to comment.