From 836d83aa7f638e44a27922b4e7ada06b35f05b5c Mon Sep 17 00:00:00 2001 From: Guoteng Rao <3603304+grao1991@users.noreply.github.com> Date: Wed, 4 Sep 2024 23:34:40 -0700 Subject: [PATCH] Simplify account_transaction_processor and use rayon to speed it up. (#495) --- .../account_transactions.rs | 98 ++++++------------- .../account_transactions_processor.rs | 32 +++--- rust/processor/src/worker.rs | 2 +- 3 files changed, 47 insertions(+), 85 deletions(-) diff --git a/rust/processor/src/db/common/models/account_transaction_models/account_transactions.rs b/rust/processor/src/db/common/models/account_transaction_models/account_transactions.rs index 9baa02b7b..385d1e010 100644 --- a/rust/processor/src/db/common/models/account_transaction_models/account_transactions.rs +++ b/rust/processor/src/db/common/models/account_transaction_models/account_transactions.rs @@ -13,11 +13,8 @@ use crate::{ schema::account_transactions, utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address}, }; -use ahash::AHashMap; -use aptos_protos::transaction::v1::{ - transaction::TxnData, write_set_change::Change, DeleteResource, Event, Transaction, - WriteResource, -}; +use ahash::AHashSet; +use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Change, Transaction}; use field_count::FieldCount; use serde::{Deserialize, Serialize}; @@ -39,7 +36,7 @@ impl AccountTransaction { /// We will also consider transactions that the account signed or is part of a multi sig / multi agent. /// TODO: recursively find the parent account of an object /// TODO: include table items in the detection path - pub fn from_transaction(transaction: &Transaction) -> AHashMap { + pub fn get_accounts(transaction: &Transaction) -> AHashSet { let txn_version = transaction.version as i64; let txn_data = match transaction.txn_data.as_ref() { Some(data) => data, @@ -51,7 +48,7 @@ impl AccountTransaction { transaction_version = transaction.version, "Transaction data doesn't exist", ); - return AHashMap::new(); + return AHashSet::new(); }, }; let transaction_info = transaction.info.as_ref().unwrap_or_else(|| { @@ -73,82 +70,43 @@ impl AccountTransaction { TxnData::BlockMetadata(inner) => (&inner.events, vec![]), TxnData::Validator(inner) => (&inner.events, vec![]), _ => { - return AHashMap::new(); + return AHashSet::new(); }, }; - let mut account_transactions = AHashMap::new(); - for sig in &signatures { - account_transactions.insert((sig.signer.clone(), txn_version), Self { - transaction_version: txn_version, - account_address: sig.signer.clone(), - }); + let mut accounts = AHashSet::new(); + for sig in signatures { + accounts.insert(sig.signer); } for event in events { - account_transactions.extend(Self::from_event(event, txn_version)); + // Record event account address. We don't really have to worry about objects here + // because it'll be taken care of in the resource section. + accounts.insert(standardize_address( + event.key.as_ref().unwrap().account_address.as_str(), + )); } for wsc in wscs { match wsc.change.as_ref().unwrap() { Change::DeleteResource(res) => { - account_transactions - .extend(Self::from_delete_resource(res, txn_version).unwrap()); + // Record resource account. + // TODO: If the resource is an object, then we need to look for the latest + // owner. This isn't really possible right now given we have parallel threads + // so it'll be very difficult to ensure that we have the correct latest owner. + accounts.insert(standardize_address(res.address.as_str())); }, Change::WriteResource(res) => { - account_transactions - .extend(Self::from_write_resource(res, txn_version).unwrap()); + // Record resource account. If the resource is an object, then we record the + // owner as well. + // This handles partial deletes as well. + accounts.insert(standardize_address(res.address.as_str())); + if let Some(inner) = + &ObjectWithMetadata::from_write_resource(res, txn_version).unwrap() + { + accounts.insert(inner.object_core.get_owner_address()); + } }, _ => {}, } } - account_transactions - } - - /// Base case, record event account address. We don't really have to worry about - /// objects here because it'll be taken care of in the resource section - fn from_event(event: &Event, txn_version: i64) -> AHashMap { - let account_address = - standardize_address(event.key.as_ref().unwrap().account_address.as_str()); - AHashMap::from([((account_address.clone(), txn_version), Self { - transaction_version: txn_version, - account_address, - })]) - } - - /// Base case, record resource account. If the resource is an object, then we record the owner as well - /// This handles partial deletes as well - fn from_write_resource( - write_resource: &WriteResource, - txn_version: i64, - ) -> anyhow::Result> { - let mut result = AHashMap::new(); - let account_address = standardize_address(write_resource.address.as_str()); - result.insert((account_address.clone(), txn_version), Self { - transaction_version: txn_version, - account_address, - }); - if let Some(inner) = &ObjectWithMetadata::from_write_resource(write_resource, txn_version)? - { - result.insert((inner.object_core.get_owner_address(), txn_version), Self { - transaction_version: txn_version, - account_address: inner.object_core.get_owner_address(), - }); - } - Ok(result) - } - - /// Base case, record resource account. - /// TODO: If the resource is an object, then we need to look for the latest owner. This isn't really possible - /// right now given we have parallel threads so it'll be very difficult to ensure that we have the correct - /// latest owner - fn from_delete_resource( - delete_resource: &DeleteResource, - txn_version: i64, - ) -> anyhow::Result> { - let mut result = AHashMap::new(); - let account_address = standardize_address(delete_resource.address.as_str()); - result.insert((account_address.clone(), txn_version), Self { - transaction_version: txn_version, - account_address, - }); - Ok(result) + accounts } } diff --git a/rust/processor/src/processors/account_transactions_processor.rs b/rust/processor/src/processors/account_transactions_processor.rs index f7ef88344..7101f60ab 100644 --- a/rust/processor/src/processors/account_transactions_processor.rs +++ b/rust/processor/src/processors/account_transactions_processor.rs @@ -13,6 +13,7 @@ use anyhow::bail; use aptos_protos::transaction::v1::Transaction; use async_trait::async_trait; use diesel::{pg::Pg, query_builder::QueryFragment}; +use rayon::prelude::*; use std::fmt::Debug; use tracing::error; @@ -101,20 +102,23 @@ impl ProcessorTrait for AccountTransactionsProcessor { let processing_start = std::time::Instant::now(); let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); - let mut account_transactions = AHashMap::new(); - - for txn in &transactions { - account_transactions.extend(AccountTransaction::from_transaction(txn)); - } - let mut account_transactions = account_transactions - .into_values() - .collect::>(); - - // Sort by PK - account_transactions.sort_by(|a, b| { - (&a.transaction_version, &a.account_address) - .cmp(&(&b.transaction_version, &b.account_address)) - }); + let account_transactions: Vec<_> = transactions + .into_par_iter() + .map(|txn| { + let transaction_version = txn.version as i64; + let accounts = AccountTransaction::get_accounts(&txn); + accounts + .into_iter() + .map(|account_address| AccountTransaction { + transaction_version, + account_address, + }) + .collect() + }) + .collect::>>() + .into_iter() + .flatten() + .collect(); let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index adbc7fa2a..e784d8d72 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -562,7 +562,7 @@ impl Worker { let num_processed = (last_txn_version - first_txn_version) + 1; - debug!( + info!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, first_txn_version,