Skip to content

Commit

Permalink
Remove unnecessary clones and use multi threads in default processor.
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 committed Sep 16, 2024
1 parent 35dcd59 commit 4c8a93d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use aptos_protos::transaction::v1::{
};
use bigdecimal::BigDecimal;
use field_count::FieldCount;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
Expand Down Expand Up @@ -330,9 +331,13 @@ impl Transaction {
let mut wscs = vec![];
let mut wsc_details = vec![];

for txn in transactions {
let (txn, block_metadata, mut wsc_list, mut wsc_detail_list) =
Self::from_transaction(txn);
let processed_txns: Vec<_> = transactions
.par_iter()
.map(Self::from_transaction)
.collect();

for processed_txn in processed_txns {
let (txn, block_metadata, mut wsc_list, mut wsc_detail_list) = processed_txn;
txns.push(txn);
if let Some(a) = block_metadata {
block_metadata_txns.push(a);
Expand Down
30 changes: 24 additions & 6 deletions rust/processor/src/processors/default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,19 @@ impl ProcessorTrait for DefaultProcessor {
)
.await;

// These vectors could be super large and take a lot of time to drop, move to background to
// make it faster.
tokio::task::spawn(async move {
drop(txns);
drop(block_metadata_transactions);
drop(write_set_changes);
drop(move_modules);
drop(move_resources);
drop(table_items);
drop(current_table_items);
drop(table_metadata);
});

let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64();
match tx_result {
Ok(_) => Ok(ProcessingResult::DefaultProcessingResult(
Expand Down Expand Up @@ -403,7 +416,7 @@ fn process_transactions(
TransactionModel::from_transactions(&transactions);
let mut block_metadata_transactions = vec![];
for block_metadata_txn in block_metadata_txns {
block_metadata_transactions.push(block_metadata_txn.clone());
block_metadata_transactions.push(block_metadata_txn);
}
let mut move_modules = vec![];
let mut move_resources = vec![];
Expand All @@ -412,19 +425,19 @@ fn process_transactions(
let mut table_metadata = AHashMap::new();
for detail in wsc_details {
match detail {
WriteSetChangeDetail::Module(module) => move_modules.push(module.clone()),
WriteSetChangeDetail::Resource(resource) => move_resources.push(resource.clone()),
WriteSetChangeDetail::Module(module) => move_modules.push(module),
WriteSetChangeDetail::Resource(resource) => move_resources.push(resource),
WriteSetChangeDetail::Table(item, current_item, metadata) => {
table_items.push(item.clone());
table_items.push(item);
current_table_items.insert(
(
current_item.table_handle.clone(),
current_item.key_hash.clone(),
),
current_item.clone(),
current_item,
);
if let Some(meta) = metadata {
table_metadata.insert(meta.handle.clone(), meta.clone());
table_metadata.insert(meta.handle.clone(), meta);
}
},
}
Expand All @@ -440,6 +453,11 @@ fn process_transactions(
.sort_by(|a, b| (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash)));
table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle));

println!(
"table_items: {}, current_table_items: {}",
table_items.len(),
current_table_items.len()
);
if flags.contains(TableFlags::MOVE_RESOURCES) {
move_resources.clear();
}
Expand Down

0 comments on commit 4c8a93d

Please sign in to comment.