Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary clones and use multi threads in default processor. #512

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use aptos_protos::transaction::v1::{
};
use bigdecimal::BigDecimal;
use field_count::FieldCount;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
Expand Down Expand Up @@ -330,9 +331,13 @@ impl Transaction {
let mut wscs = vec![];
let mut wsc_details = vec![];

for txn in transactions {
let (txn, block_metadata, mut wsc_list, mut wsc_detail_list) =
Self::from_transaction(txn);
let processed_txns: Vec<_> = transactions
.par_iter()
.map(Self::from_transaction)
.collect();

for processed_txn in processed_txns {
let (txn, block_metadata, mut wsc_list, mut wsc_detail_list) = processed_txn;
txns.push(txn);
if let Some(a) = block_metadata {
block_metadata_txns.push(a);
Expand Down
30 changes: 24 additions & 6 deletions rust/processor/src/processors/default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,19 @@ impl ProcessorTrait for DefaultProcessor {
)
.await;

// These vectors could be super large and take a lot of time to drop, move to background to
// make it faster.
tokio::task::spawn(async move {
drop(txns);
drop(block_metadata_transactions);
drop(write_set_changes);
drop(move_modules);
drop(move_resources);
drop(table_items);
drop(current_table_items);
drop(table_metadata);
});

let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64();
match tx_result {
Ok(_) => Ok(ProcessingResult::DefaultProcessingResult(
Expand Down Expand Up @@ -403,7 +416,7 @@ fn process_transactions(
TransactionModel::from_transactions(&transactions);
let mut block_metadata_transactions = vec![];
for block_metadata_txn in block_metadata_txns {
block_metadata_transactions.push(block_metadata_txn.clone());
block_metadata_transactions.push(block_metadata_txn);
}
let mut move_modules = vec![];
let mut move_resources = vec![];
Expand All @@ -412,19 +425,19 @@ fn process_transactions(
let mut table_metadata = AHashMap::new();
for detail in wsc_details {
match detail {
WriteSetChangeDetail::Module(module) => move_modules.push(module.clone()),
WriteSetChangeDetail::Resource(resource) => move_resources.push(resource.clone()),
WriteSetChangeDetail::Module(module) => move_modules.push(module),
WriteSetChangeDetail::Resource(resource) => move_resources.push(resource),
WriteSetChangeDetail::Table(item, current_item, metadata) => {
table_items.push(item.clone());
table_items.push(item);
current_table_items.insert(
(
current_item.table_handle.clone(),
current_item.key_hash.clone(),
),
current_item.clone(),
current_item,
);
if let Some(meta) = metadata {
table_metadata.insert(meta.handle.clone(), meta.clone());
table_metadata.insert(meta.handle.clone(), meta);
}
},
}
Expand All @@ -440,6 +453,11 @@ fn process_transactions(
.sort_by(|a, b| (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash)));
table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle));

println!(
"table_items: {}, current_table_items: {}",
table_items.len(),
current_table_items.len()
);
if flags.contains(TableFlags::MOVE_RESOURCES) {
move_resources.clear();
}
Expand Down
Loading