From 4c8a93de475b63a532057890189fce2224a3237c Mon Sep 17 00:00:00 2001 From: Guoteng Rao <3603304+grao1991@users.noreply.github.com> Date: Sat, 14 Sep 2024 00:26:23 +0000 Subject: [PATCH] Remove unnecessary clones and use multi threads in default processor. --- .../models/default_models/transactions.rs | 11 +++++-- .../src/processors/default_processor.rs | 30 +++++++++++++++---- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/rust/processor/src/db/common/models/default_models/transactions.rs b/rust/processor/src/db/common/models/default_models/transactions.rs index 2201da4e7..d94eb7b46 100644 --- a/rust/processor/src/db/common/models/default_models/transactions.rs +++ b/rust/processor/src/db/common/models/default_models/transactions.rs @@ -25,6 +25,7 @@ use aptos_protos::transaction::v1::{ }; use bigdecimal::BigDecimal; use field_count::FieldCount; +use rayon::prelude::*; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] @@ -330,9 +331,13 @@ impl Transaction { let mut wscs = vec![]; let mut wsc_details = vec![]; - for txn in transactions { - let (txn, block_metadata, mut wsc_list, mut wsc_detail_list) = - Self::from_transaction(txn); + let processed_txns: Vec<_> = transactions + .par_iter() + .map(Self::from_transaction) + .collect(); + + for processed_txn in processed_txns { + let (txn, block_metadata, mut wsc_list, mut wsc_detail_list) = processed_txn; txns.push(txn); if let Some(a) = block_metadata { block_metadata_txns.push(a); diff --git a/rust/processor/src/processors/default_processor.rs b/rust/processor/src/processors/default_processor.rs index d3d6525f6..4656a3f36 100644 --- a/rust/processor/src/processors/default_processor.rs +++ b/rust/processor/src/processors/default_processor.rs @@ -355,6 +355,19 @@ impl ProcessorTrait for DefaultProcessor { ) .await; + // These vectors could be super large and take a lot of time to drop, move to background to + // make it faster. + tokio::task::spawn(async move { + drop(txns); + drop(block_metadata_transactions); + drop(write_set_changes); + drop(move_modules); + drop(move_resources); + drop(table_items); + drop(current_table_items); + drop(table_metadata); + }); + let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); match tx_result { Ok(_) => Ok(ProcessingResult::DefaultProcessingResult( @@ -403,7 +416,7 @@ fn process_transactions( TransactionModel::from_transactions(&transactions); let mut block_metadata_transactions = vec![]; for block_metadata_txn in block_metadata_txns { - block_metadata_transactions.push(block_metadata_txn.clone()); + block_metadata_transactions.push(block_metadata_txn); } let mut move_modules = vec![]; let mut move_resources = vec![]; @@ -412,19 +425,19 @@ fn process_transactions( let mut table_metadata = AHashMap::new(); for detail in wsc_details { match detail { - WriteSetChangeDetail::Module(module) => move_modules.push(module.clone()), - WriteSetChangeDetail::Resource(resource) => move_resources.push(resource.clone()), + WriteSetChangeDetail::Module(module) => move_modules.push(module), + WriteSetChangeDetail::Resource(resource) => move_resources.push(resource), WriteSetChangeDetail::Table(item, current_item, metadata) => { - table_items.push(item.clone()); + table_items.push(item); current_table_items.insert( ( current_item.table_handle.clone(), current_item.key_hash.clone(), ), - current_item.clone(), + current_item, ); if let Some(meta) = metadata { - table_metadata.insert(meta.handle.clone(), meta.clone()); + table_metadata.insert(meta.handle.clone(), meta); } }, } @@ -440,6 +453,11 @@ fn process_transactions( .sort_by(|a, b| (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash))); table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle)); + println!( + "table_items: {}, current_table_items: {}", + table_items.len(), + current_table_items.len() + ); if flags.contains(TableFlags::MOVE_RESOURCES) { move_resources.clear(); }