diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 1963f8d2f..fc72260e0 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -3006,6 +3006,7 @@ dependencies = [ "postgres-native-tls", "prometheus", "prost 0.12.3", + "rayon", "regex", "serde", "serde_json", diff --git a/rust/processor/Cargo.toml b/rust/processor/Cargo.toml index 4e748c8f7..32c87ad6a 100644 --- a/rust/processor/Cargo.toml +++ b/rust/processor/Cargo.toml @@ -46,6 +46,7 @@ num_cpus = { workspace = true } once_cell = { workspace = true } prometheus = { workspace = true } prost = { workspace = true } +rayon = { workspace = true } regex = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_activities.rs b/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_activities.rs index 9dd5afbc8..f0e000aa7 100644 --- a/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_activities.rs +++ b/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_activities.rs @@ -59,7 +59,7 @@ pub struct FungibleAssetActivity { } impl FungibleAssetActivity { - pub async fn get_v2_from_event( + pub fn get_v2_from_event( event: &Event, txn_version: i64, block_height: i64, diff --git a/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs b/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs index 648dc4da9..8a22801b3 100644 --- a/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs +++ b/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs @@ -148,7 +148,7 @@ impl From<&CurrentFungibleAssetBalance> for CurrentUnifiedFungibleAssetBalance { impl FungibleAssetBalance { /// Basically just need to index FA Store, but we'll need to look up FA metadata - pub async fn get_v2_from_write_resource( + pub fn get_v2_from_write_resource( write_resource: &WriteResource, write_set_change_index: i64, txn_version: i64, diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index 3dbd424e5..68f4a51b2 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -40,6 +40,7 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; +use rayon::prelude::*; use std::fmt::Debug; use tracing::error; @@ -453,15 +454,21 @@ async fn parse_v2_coin( let mut current_fungible_asset_balances: CurrentFungibleAssetMapping = AHashMap::new(); let mut fungible_asset_metadata: FungibleAssetMetadataMapping = AHashMap::new(); - // Get Metadata for fungible assets by object - let mut fungible_asset_object_helper: ObjectAggregatedDataMapping = AHashMap::new(); - - for txn in transactions { - let txn_version = txn.version as i64; - let block_height = txn.block_height as i64; - let txn_data = match txn.txn_data.as_ref() { - Some(data) => data, - None => { + let data: Vec<_> = transactions + .par_iter() + .map(|txn| { + let mut fungible_asset_activities = vec![]; + let mut fungible_asset_balances = vec![]; + let mut all_coin_supply = vec![]; + let mut current_fungible_asset_balances: CurrentFungibleAssetMapping = AHashMap::new(); + let mut fungible_asset_metadata: FungibleAssetMetadataMapping = AHashMap::new(); + + // Get Metadata for fungible assets by object + let mut fungible_asset_object_helper: ObjectAggregatedDataMapping = AHashMap::new(); + + let txn_version = txn.version as i64; + let block_height = txn.block_height as i64; + if txn.txn_data.is_none() { tracing::warn!( transaction_version = txn_version, "Transaction data doesn't exist" @@ -469,278 +476,296 @@ async fn parse_v2_coin( PROCESSOR_UNKNOWN_TYPE_COUNT .with_label_values(&["FungibleAssetProcessor"]) .inc(); - continue; - }, - }; - let transaction_info = txn.info.as_ref().expect("Transaction info doesn't exist!"); - let txn_timestamp = txn - .timestamp - .as_ref() - .expect("Transaction timestamp doesn't exist!") - .seconds; - #[allow(deprecated)] - let txn_timestamp = - NaiveDateTime::from_timestamp_opt(txn_timestamp, 0).expect("Txn Timestamp is invalid!"); - let txn_epoch = txn.epoch as i64; - - let default = vec![]; - let (events, user_request, entry_function_id_str) = match txn_data { - TxnData::BlockMetadata(tx_inner) => (&tx_inner.events, None, None), - TxnData::Validator(tx_inner) => (&tx_inner.events, None, None), - TxnData::Genesis(tx_inner) => (&tx_inner.events, None, None), - TxnData::User(tx_inner) => { - let user_request = tx_inner - .request - .as_ref() - .expect("Sends is not present in user txn"); - let entry_function_id_str = get_entry_function_from_user_request(user_request); - (&tx_inner.events, Some(user_request), entry_function_id_str) - }, - _ => (&default, None, None), - }; - - // This is because v1 events (deposit/withdraw) don't have coin type so the only way is to match - // the event to the resource using the event guid - let mut event_to_v1_coin_type: EventToCoinType = AHashMap::new(); - - // First loop to get all objects - // Need to do a first pass to get all the objects - for wsc in transaction_info.changes.iter() { - if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { - if let Some(object) = - ObjectWithMetadata::from_write_resource(wr, txn_version).unwrap() - { - fungible_asset_object_helper.insert( - standardize_address(&wr.address.to_string()), - ObjectAggregatedData { - object, - ..ObjectAggregatedData::default() - }, - ); - } + return (vec![], vec![], vec![], AHashMap::new(), AHashMap::new()); } - } - // Loop to get the metadata relevant to parse v1 and v2. - // As an optimization, we also handle v1 balances in the process - for (index, wsc) in transaction_info.changes.iter().enumerate() { - if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { - if let Some((balance, current_balance, event_to_coin)) = - FungibleAssetBalance::get_v1_from_write_resource( - write_resource, - index as i64, - txn_version, - txn_timestamp, - ) - .unwrap() - { - fungible_asset_balances.push(balance); - current_fungible_asset_balances - .insert(current_balance.storage_id.clone(), current_balance.clone()); - event_to_v1_coin_type.extend(event_to_coin); - } - // Fill the v2 object metadata - let address = standardize_address(&write_resource.address.to_string()); - if let Some(aggregated_data) = fungible_asset_object_helper.get_mut(&address) { - if let Some(fungible_asset_metadata) = - FungibleAssetMetadata::from_write_resource(write_resource, txn_version) - .unwrap() - { - aggregated_data.fungible_asset_metadata = Some(fungible_asset_metadata); - } - if let Some(fungible_asset_store) = - FungibleAssetStore::from_write_resource(write_resource, txn_version) - .unwrap() - { - aggregated_data.fungible_asset_store = Some(fungible_asset_store); - } - if let Some(fungible_asset_supply) = - FungibleAssetSupply::from_write_resource(write_resource, txn_version) - .unwrap() + let txn_data = txn.txn_data.as_ref().unwrap(); + let transaction_info = txn.info.as_ref().expect("Transaction info doesn't exist!"); + let txn_timestamp = txn + .timestamp + .as_ref() + .expect("Transaction timestamp doesn't exist!") + .seconds; + #[allow(deprecated)] + let txn_timestamp = NaiveDateTime::from_timestamp_opt(txn_timestamp, 0) + .expect("Txn Timestamp is invalid!"); + let txn_epoch = txn.epoch as i64; + + let default = vec![]; + let (events, user_request, entry_function_id_str) = match txn_data { + TxnData::BlockMetadata(tx_inner) => (&tx_inner.events, None, None), + TxnData::Validator(tx_inner) => (&tx_inner.events, None, None), + TxnData::Genesis(tx_inner) => (&tx_inner.events, None, None), + TxnData::User(tx_inner) => { + let user_request = tx_inner + .request + .as_ref() + .expect("Sends is not present in user txn"); + let entry_function_id_str = get_entry_function_from_user_request(user_request); + (&tx_inner.events, Some(user_request), entry_function_id_str) + }, + _ => (&default, None, None), + }; + + // This is because v1 events (deposit/withdraw) don't have coin type so the only way is to match + // the event to the resource using the event guid + let mut event_to_v1_coin_type: EventToCoinType = AHashMap::new(); + + // First loop to get all objects + // Need to do a first pass to get all the objects + for wsc in transaction_info.changes.iter() { + if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { + if let Some(object) = + ObjectWithMetadata::from_write_resource(wr, txn_version).unwrap() { - aggregated_data.fungible_asset_supply = Some(fungible_asset_supply); + fungible_asset_object_helper.insert( + standardize_address(&wr.address.to_string()), + ObjectAggregatedData { + object, + ..ObjectAggregatedData::default() + }, + ); } - if let Some(concurrent_fungible_asset_supply) = - ConcurrentFungibleAssetSupply::from_write_resource( + } + } + // Loop to get the metadata relevant to parse v1 and v2. + // As an optimization, we also handle v1 balances in the process + for (index, wsc) in transaction_info.changes.iter().enumerate() { + if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { + if let Some((balance, current_balance, event_to_coin)) = + FungibleAssetBalance::get_v1_from_write_resource( write_resource, + index as i64, txn_version, + txn_timestamp, ) .unwrap() { - aggregated_data.concurrent_fungible_asset_supply = - Some(concurrent_fungible_asset_supply); + fungible_asset_balances.push(balance); + current_fungible_asset_balances + .insert(current_balance.storage_id.clone(), current_balance.clone()); + event_to_v1_coin_type.extend(event_to_coin); } - if let Some(concurrent_fungible_asset_balance) = - ConcurrentFungibleAssetBalance::from_write_resource( - write_resource, + // Fill the v2 object metadata + let address = standardize_address(&write_resource.address.to_string()); + if let Some(aggregated_data) = fungible_asset_object_helper.get_mut(&address) { + if let Some(fungible_asset_metadata) = + FungibleAssetMetadata::from_write_resource(write_resource, txn_version) + .unwrap() + { + aggregated_data.fungible_asset_metadata = Some(fungible_asset_metadata); + } + if let Some(fungible_asset_store) = + FungibleAssetStore::from_write_resource(write_resource, txn_version) + .unwrap() + { + aggregated_data.fungible_asset_store = Some(fungible_asset_store); + } + if let Some(fungible_asset_supply) = + FungibleAssetSupply::from_write_resource(write_resource, txn_version) + .unwrap() + { + aggregated_data.fungible_asset_supply = Some(fungible_asset_supply); + } + if let Some(concurrent_fungible_asset_supply) = + ConcurrentFungibleAssetSupply::from_write_resource( + write_resource, + txn_version, + ) + .unwrap() + { + aggregated_data.concurrent_fungible_asset_supply = + Some(concurrent_fungible_asset_supply); + } + if let Some(concurrent_fungible_asset_balance) = + ConcurrentFungibleAssetBalance::from_write_resource( + write_resource, + txn_version, + ) + .unwrap() + { + aggregated_data.concurrent_fungible_asset_balance = + Some(concurrent_fungible_asset_balance); + } + if let Some(untransferable) = + Untransferable::from_write_resource(write_resource, txn_version) + .unwrap() + { + aggregated_data.untransferable = Some(untransferable); + } + } + } else if let Change::DeleteResource(delete_resource) = wsc.change.as_ref().unwrap() + { + if let Some((balance, current_balance, event_to_coin)) = + FungibleAssetBalance::get_v1_from_delete_resource( + delete_resource, + index as i64, txn_version, + txn_timestamp, ) .unwrap() { - aggregated_data.concurrent_fungible_asset_balance = - Some(concurrent_fungible_asset_balance); - } - if let Some(untransferable) = - Untransferable::from_write_resource(write_resource, txn_version).unwrap() - { - aggregated_data.untransferable = Some(untransferable); + fungible_asset_balances.push(balance); + current_fungible_asset_balances + .insert(current_balance.storage_id.clone(), current_balance.clone()); + event_to_v1_coin_type.extend(event_to_coin); } } - } else if let Change::DeleteResource(delete_resource) = wsc.change.as_ref().unwrap() { - if let Some((balance, current_balance, event_to_coin)) = - FungibleAssetBalance::get_v1_from_delete_resource( - delete_resource, - index as i64, - txn_version, - txn_timestamp, - ) - .unwrap() - { - fungible_asset_balances.push(balance); - current_fungible_asset_balances - .insert(current_balance.storage_id.clone(), current_balance.clone()); - event_to_v1_coin_type.extend(event_to_coin); - } } - } - - // The artificial gas event, only need for v1 - if let Some(req) = user_request { - let fee_statement = events.iter().find_map(|event| { - let event_type = event.type_str.as_str(); - FeeStatement::from_event(event_type, &event.data, txn_version) - }); - let gas_event = FungibleAssetActivity::get_gas_event( - transaction_info, - req, - &entry_function_id_str, - txn_version, - txn_timestamp, - block_height, - fee_statement, - ); - fungible_asset_activities.push(gas_event); - } - // Loop to handle events and collect additional metadata from events for v2 - for (index, event) in events.iter().enumerate() { - if let Some(v1_activity) = FungibleAssetActivity::get_v1_from_event( - event, - txn_version, - block_height, - txn_timestamp, - &entry_function_id_str, - &event_to_v1_coin_type, - index as i64, - ) - .unwrap_or_else(|e| { - tracing::error!( - transaction_version = txn_version, - index = index, - error = ?e, - "[Parser] error parsing fungible asset activity v1"); - panic!("[Parser] error parsing fungible asset activity v1"); - }) { - fungible_asset_activities.push(v1_activity); + // The artificial gas event, only need for v1 + if let Some(req) = user_request { + let fee_statement = events.iter().find_map(|event| { + let event_type = event.type_str.as_str(); + FeeStatement::from_event(event_type, &event.data, txn_version) + }); + let gas_event = FungibleAssetActivity::get_gas_event( + transaction_info, + req, + &entry_function_id_str, + txn_version, + txn_timestamp, + block_height, + fee_statement, + ); + fungible_asset_activities.push(gas_event); } - if let Some(v2_activity) = FungibleAssetActivity::get_v2_from_event( - event, - txn_version, - block_height, - txn_timestamp, - index as i64, - &entry_function_id_str, - &fungible_asset_object_helper, - ) - .await - .unwrap_or_else(|e| { - tracing::error!( - transaction_version = txn_version, - index = index, - error = ?e, - "[Parser] error parsing fungible asset activity v2"); - panic!("[Parser] error parsing fungible asset activity v2"); - }) { - fungible_asset_activities.push(v2_activity); + + // Loop to handle events and collect additional metadata from events for v2 + for (index, event) in events.iter().enumerate() { + if let Some(v1_activity) = FungibleAssetActivity::get_v1_from_event( + event, + txn_version, + block_height, + txn_timestamp, + &entry_function_id_str, + &event_to_v1_coin_type, + index as i64, + ) + .unwrap_or_else(|e| { + tracing::error!( + transaction_version = txn_version, + index = index, + error = ?e, + "[Parser] error parsing fungible asset activity v1"); + panic!("[Parser] error parsing fungible asset activity v1"); + }) { + fungible_asset_activities.push(v1_activity); + } + if let Some(v2_activity) = FungibleAssetActivity::get_v2_from_event( + event, + txn_version, + block_height, + txn_timestamp, + index as i64, + &entry_function_id_str, + &fungible_asset_object_helper, + ) + .unwrap_or_else(|e| { + tracing::error!( + transaction_version = txn_version, + index = index, + error = ?e, + "[Parser] error parsing fungible asset activity v2"); + panic!("[Parser] error parsing fungible asset activity v2"); + }) { + fungible_asset_activities.push(v2_activity); + } } - } - // Loop to handle all the other changes - for (index, wsc) in transaction_info.changes.iter().enumerate() { - match wsc.change.as_ref().unwrap() { - Change::WriteResource(write_resource) => { - if let Some(fa_metadata) = - FungibleAssetMetadataModel::get_v1_from_write_resource( - write_resource, + // Loop to handle all the other changes + for (index, wsc) in transaction_info.changes.iter().enumerate() { + match wsc.change.as_ref().unwrap() { + Change::WriteResource(write_resource) => { + if let Some(fa_metadata) = + FungibleAssetMetadataModel::get_v1_from_write_resource( + write_resource, + txn_version, + txn_timestamp, + ) + .unwrap_or_else(|e| { + tracing::error!( + transaction_version = txn_version, + index = index, + error = ?e, + "[Parser] error parsing fungible metadata v1"); + panic!("[Parser] error parsing fungible metadata v1"); + }) + { + fungible_asset_metadata + .insert(fa_metadata.asset_type.clone(), fa_metadata); + } + if let Some(fa_metadata) = + FungibleAssetMetadataModel::get_v2_from_write_resource( + write_resource, + txn_version, + txn_timestamp, + &fungible_asset_object_helper, + ) + .unwrap_or_else(|e| { + tracing::error!( + transaction_version = txn_version, + index = index, + error = ?e, + "[Parser] error parsing fungible metadata v2"); + panic!("[Parser] error parsing fungible metadata v2"); + }) + { + fungible_asset_metadata + .insert(fa_metadata.asset_type.clone(), fa_metadata); + } + if let Some((balance, curr_balance)) = + FungibleAssetBalance::get_v2_from_write_resource( + write_resource, + index as i64, + txn_version, + txn_timestamp, + &fungible_asset_object_helper, + ) + .unwrap_or_else(|e| { + tracing::error!( + transaction_version = txn_version, + index = index, + error = ?e, + "[Parser] error parsing fungible balance v2"); + panic!("[Parser] error parsing fungible balance v2"); + }) + { + fungible_asset_balances.push(balance); + current_fungible_asset_balances + .insert(curr_balance.storage_id.clone(), curr_balance); + } + }, + Change::WriteTableItem(table_item) => { + if let Some(coin_supply) = CoinSupply::from_write_table_item( + table_item, txn_version, txn_timestamp, + txn_epoch, ) - .unwrap_or_else(|e| { - tracing::error!( - transaction_version = txn_version, - index = index, - error = ?e, - "[Parser] error parsing fungible metadata v1"); - panic!("[Parser] error parsing fungible metadata v1"); - }) - { - fungible_asset_metadata.insert(fa_metadata.asset_type.clone(), fa_metadata); - } - if let Some(fa_metadata) = - FungibleAssetMetadataModel::get_v2_from_write_resource( - write_resource, - txn_version, - txn_timestamp, - &fungible_asset_object_helper, - ) - .unwrap_or_else(|e| { - tracing::error!( - transaction_version = txn_version, - index = index, - error = ?e, - "[Parser] error parsing fungible metadata v2"); - panic!("[Parser] error parsing fungible metadata v2"); - }) - { - fungible_asset_metadata.insert(fa_metadata.asset_type.clone(), fa_metadata); - } - if let Some((balance, curr_balance)) = - FungibleAssetBalance::get_v2_from_write_resource( - write_resource, - index as i64, - txn_version, - txn_timestamp, - &fungible_asset_object_helper, - ) - .await - .unwrap_or_else(|e| { - tracing::error!( - transaction_version = txn_version, - index = index, - error = ?e, - "[Parser] error parsing fungible balance v2"); - panic!("[Parser] error parsing fungible balance v2"); - }) - { - fungible_asset_balances.push(balance); - current_fungible_asset_balances - .insert(curr_balance.storage_id.clone(), curr_balance); - } - }, - Change::WriteTableItem(table_item) => { - if let Some(coin_supply) = CoinSupply::from_write_table_item( - table_item, - txn_version, - txn_timestamp, - txn_epoch, - ) - .unwrap() - { - all_coin_supply.push(coin_supply); - } - }, - _ => {}, + .unwrap() + { + all_coin_supply.push(coin_supply); + } + }, + _ => {}, + } } - } + ( + fungible_asset_activities, + fungible_asset_balances, + all_coin_supply, + current_fungible_asset_balances, + fungible_asset_metadata, + ) + }) + .collect(); + + for (faa, fab, acs, cfab, fam) in data { + fungible_asset_activities.extend(faa); + fungible_asset_balances.extend(fab); + all_coin_supply.extend(acs); + current_fungible_asset_balances.extend(cfab); + fungible_asset_metadata.extend(fam); } // Boilerplate after this