Skip to content

Commit

Permalink
refactor: use bound executor for outbound messaging pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Feb 3, 2022
1 parent 3495e85 commit ab3a523
Show file tree
Hide file tree
Showing 28 changed files with 461 additions and 230 deletions.
3 changes: 2 additions & 1 deletion applications/tari_base_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ where B: BlockchainBackend + 'static
auxilary_tcp_listener_address: self.config.auxilary_tcp_listener_address.clone(),
datastore_path: self.config.peer_db_path.clone(),
peer_database_name: "peers".to_string(),
max_concurrent_inbound_tasks: 100,
max_concurrent_inbound_tasks: 50,
max_concurrent_outbound_tasks: 100,
outbound_buffer_size: 100,
dht: DhtConfig {
database_url: DbConnectionUrl::File(self.config.data_dir.join("dht.db")),
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_base_node/src/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl CommandHandler {
status_line.add_field(
"Rpc",
format!(
"{}/{} sessions",
"{}/{}",
num_active_rpc_sessions,
config
.rpc_max_simultaneous_sessions
Expand Down
5 changes: 3 additions & 2 deletions applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,9 @@ pub async fn init_wallet(
auxilary_tcp_listener_address: None,
datastore_path: config.console_wallet_peer_db_path.clone(),
peer_database_name: "peers".to_string(),
max_concurrent_inbound_tasks: 100,
outbound_buffer_size: 100,
max_concurrent_inbound_tasks: 10,
max_concurrent_outbound_tasks: 10,
outbound_buffer_size: 10,
dht: DhtConfig {
database_url: DbConnectionUrl::File(config.data_dir.join("dht-console-wallet.db")),
auto_join: true,
Expand Down
3 changes: 2 additions & 1 deletion applications/tari_validator_node/src/comms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ fn create_comms_config(config: &GlobalConfig, node_identity: Arc<NodeIdentity>)
transport_type: create_transport_type(config),
datastore_path: config.peer_db_path.clone(),
peer_database_name: "peers".to_string(),
max_concurrent_inbound_tasks: 100,
max_concurrent_inbound_tasks: 50,
max_concurrent_outbound_tasks: 100,
outbound_buffer_size: 100,
dht: DhtConfig {
database_url: DbConnectionUrl::File(config.data_dir.join("dht.db")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ where B: BlockchainBackend + 'static
})
},
NodeCommsRequest::FetchMempoolTransactionsByExcessSigs { excess_sigs } => {
let (transactions, not_found) = self.mempool.retrieve_by_excess_sigs(&excess_sigs).await;
let (transactions, not_found) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?;
Ok(NodeCommsResponse::FetchMempoolTransactionsByExcessSigsResponse(
FetchMempoolTransactionsResponse {
transactions,
Expand Down Expand Up @@ -553,7 +553,7 @@ where B: BlockchainBackend + 'static
kernel_excess_sigs: excess_sigs,
} = new_block;

let (known_transactions, missing_excess_sigs) = self.mempool.retrieve_by_excess_sigs(&excess_sigs).await;
let (known_transactions, missing_excess_sigs) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?;
let known_transactions = known_transactions.into_iter().map(|tx| (*tx).clone()).collect();

metrics::compact_block_tx_misses(header.height).set(missing_excess_sigs.len() as i64);
Expand Down Expand Up @@ -587,7 +587,7 @@ where B: BlockchainBackend + 'static

// Add returned transactions to unconfirmed pool
if !transactions.is_empty() {
self.mempool.insert_all(&transactions).await?;
self.mempool.insert_all(transactions.clone()).await?;
}

if !not_found.is_empty() {
Expand Down Expand Up @@ -708,8 +708,6 @@ where B: BlockchainBackend + 'static
BlockAddResult::ChainReorg { .. } => true,
};

self.blockchain_db.cleanup_orphans().await?;

self.update_block_result_metrics(&block_add_result);
self.publish_block_event(BlockEvent::ValidBlockAdded(block.clone(), block_add_result));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,14 @@ use crate::{
sync::{rpc, SyncPeer},
},
blocks::{BlockHeader, ChainHeader, UpdateBlockAccumulatedData},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, MmrTree, PrunedOutput},
chain_storage::{
async_db::AsyncBlockchainDb,
BlockchainBackend,
ChainStorageError,
DbTransaction,
MmrTree,
PrunedOutput,
},
proto::base_node::{
sync_utxo as proto_sync_utxo,
sync_utxos_response::UtxoOrDeleted,
Expand Down Expand Up @@ -697,84 +704,89 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let mut prev_mmr = 0;
let mut prev_kernel_mmr = 0;

let height = header.height();
let bitmap = self.take_final_bitmap();
let mut txn = self.db().write_transaction();
let mut utxo_mmr_position = 0;
let mut prune_positions = vec![];

for h in 0..=header.height() {
let curr_header = self.db().fetch_chain_header(h).await?;

trace!(
target: LOG_TARGET,
"Fetching utxos from db: height:{}, header.output_mmr:{}, prev_mmr:{}, end:{}",
curr_header.height(),
curr_header.header().output_mmr_size,
prev_mmr,
curr_header.header().output_mmr_size - 1
);
let (utxos, _) = self.db().fetch_utxos_in_block(curr_header.hash().clone(), None).await?;
trace!(
target: LOG_TARGET,
"Fetching kernels from db: height:{}, header.kernel_mmr:{}, prev_mmr:{}, end:{}",
curr_header.height(),
curr_header.header().kernel_mmr_size,
prev_kernel_mmr,
curr_header.header().kernel_mmr_size - 1
);

trace!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len());
let mut prune_counter = 0;
for u in utxos {
match u {
PrunedOutput::NotPruned { output } => {
if bitmap.contains(utxo_mmr_position) {
debug!(
target: LOG_TARGET,
"Found output that needs pruning at height: {} position: {}", h, utxo_mmr_position
);
prune_positions.push(utxo_mmr_position);
prune_counter += 1;
} else {
pruned_utxo_sum = &output.commitment + &pruned_utxo_sum;
}
},
_ => {
prune_counter += 1;
},
}
utxo_mmr_position += 1;
}
if prune_counter > 0 {
trace!(target: LOG_TARGET, "Pruned {} outputs", prune_counter);
}
prev_mmr = curr_header.header().output_mmr_size;
let db = self.db().inner().clone();
task::spawn_blocking(move || {
let mut txn = DbTransaction::new();
let mut utxo_mmr_position = 0;
let mut prune_positions = vec![];

let kernels = self.db().fetch_kernels_in_block(curr_header.hash().clone()).await?;
trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len());
for k in kernels {
pruned_kernel_sum = &k.excess + &pruned_kernel_sum;
}
prev_kernel_mmr = curr_header.header().kernel_mmr_size;
for h in 0..=height {
let curr_header = db.fetch_chain_header(h)?;

if h % 1000 == 0 {
debug!(
trace!(
target: LOG_TARGET,
"Fetching utxos from db: height:{}, header.output_mmr:{}, prev_mmr:{}, end:{}",
curr_header.height(),
curr_header.header().output_mmr_size,
prev_mmr,
curr_header.header().output_mmr_size - 1
);
let (utxos, _) = db.fetch_utxos_in_block(curr_header.hash().clone(), None)?;
trace!(
target: LOG_TARGET,
"Final Validation: {:.2}% complete. Height: {}, mmr_position: {} ",
(h as f32 / header.height() as f32) * 100.0,
h,
utxo_mmr_position,
"Fetching kernels from db: height:{}, header.kernel_mmr:{}, prev_mmr:{}, end:{}",
curr_header.height(),
curr_header.header().kernel_mmr_size,
prev_kernel_mmr,
curr_header.header().kernel_mmr_size - 1
);

trace!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len());
let mut prune_counter = 0;
for u in utxos {
match u {
PrunedOutput::NotPruned { output } => {
if bitmap.contains(utxo_mmr_position) {
debug!(
target: LOG_TARGET,
"Found output that needs pruning at height: {} position: {}", h, utxo_mmr_position
);
prune_positions.push(utxo_mmr_position);
prune_counter += 1;
} else {
pruned_utxo_sum = &output.commitment + &pruned_utxo_sum;
}
},
_ => {
prune_counter += 1;
},
}
utxo_mmr_position += 1;
}
if prune_counter > 0 {
trace!(target: LOG_TARGET, "Pruned {} outputs", prune_counter);
}
prev_mmr = curr_header.header().output_mmr_size;

let kernels = db.fetch_kernels_in_block(curr_header.hash().clone())?;
trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len());
for k in kernels {
pruned_kernel_sum = &k.excess + &pruned_kernel_sum;
}
prev_kernel_mmr = curr_header.header().kernel_mmr_size;

if h % 1000 == 0 {
debug!(
target: LOG_TARGET,
"Final Validation: {:.2}% complete. Height: {}, mmr_position: {} ",
(h as f32 / height as f32) * 100.0,
h,
utxo_mmr_position,
);
}
}
}

if !prune_positions.is_empty() {
debug!(target: LOG_TARGET, "Pruning {} spent outputs", prune_positions.len());
txn.prune_output_at_positions(prune_positions);
txn.commit().await?;
}
if !prune_positions.is_empty() {
debug!(target: LOG_TARGET, "Pruning {} spent outputs", prune_positions.len());
txn.prune_outputs_at_positions(prune_positions);
db.write(txn)?;
}

Ok((pruned_utxo_sum, pruned_kernel_sum))
Ok((pruned_utxo_sum, pruned_kernel_sum))
})
.await?
}

#[inline]
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/blocks/block_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl Display for BlockHeader {
)?;
writeln!(
fmt,
"Merkle roots:\nInputs: {},\n Outputs: {} ({})\nWitness: {}\nKernels: {} ({})\n",
"Merkle roots:\nInputs: {},\nOutputs: {} ({})\nWitness: {}\nKernels: {} ({})\n",
self.input_mr.to_hex(),
self.output_mr.to_hex(),
self.output_mmr_size,
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ impl<'a, B: BlockchainBackend + 'static> AsyncDbTransaction<'a, B> {
self
}

pub fn prune_output_at_positions(&mut self, positions: Vec<u32>) -> &mut Self {
pub fn prune_outputs_at_positions(&mut self, positions: Vec<u32>) -> &mut Self {
self.transaction.prune_outputs_at_positions(positions);
self
}
Expand Down
17 changes: 14 additions & 3 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,13 +888,20 @@ where B: BlockchainBackend
);
return Err(e.into());
}

trace!(
target: LOG_TARGET,
"[add_block] acquired write access db lock for block #{} ",
&new_height
"[add_block] waiting for write access to add block block #{}",
new_height
);
let timer = Instant::now();
let mut db = self.db_write_access()?;

trace!(
target: LOG_TARGET,
"[add_block] acquired write access db lock for block #{} in {:.2?}",
new_height,
timer.elapsed()
);
let block_add_result = add_block(
&mut *db,
&self.config,
Expand All @@ -915,6 +922,10 @@ where B: BlockchainBackend
prune_database_if_needed(&mut *db, self.config.pruning_horizon, self.config.pruning_interval)?;
}

if let Err(e) = cleanup_orphans(&mut *db, self.config.orphan_storage_capacity) {
warn!(target: LOG_TARGET, "Failed to clean up orphans: {}", e);
}

debug!(
target: LOG_TARGET,
"Candidate block `add_block` result: {}", block_add_result
Expand Down
5 changes: 5 additions & 0 deletions base_layer/core/src/mempool/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

use tari_service_framework::reply_channel::TransportChannelError;
use thiserror::Error;
use tokio::task::JoinError;

use crate::{mempool::unconfirmed_pool::UnconfirmedPoolError, transactions::transaction::TransactionError};

Expand All @@ -35,4 +36,8 @@ pub enum MempoolError {
TransportChannelError(#[from] TransportChannelError),
#[error("The transaction did not contain any kernels")]
TransactionNoKernels,
#[error("Mempool lock poisoned. This indicates that the mempool has panicked while holding a RwLockGuard.")]
RwLockPoisonError,
#[error(transparent)]
BlockingTaskError(#[from] JoinError),
}
Loading

0 comments on commit ab3a523

Please sign in to comment.