From a553ebdf0bf79e0d7f4932a4d2c106bc57b1a6d9 Mon Sep 17 00:00:00 2001 From: bowenyang007 Date: Mon, 23 Sep 2024 16:21:43 -0700 Subject: [PATCH] [parquet] Parse fa activities with parquet (#506) * parse fa activities with parquet * add new processor for fa activities * Yuunlimm/parquet fa migrate (#509) * fix lint --------- Co-authored-by: Yuun Lim <38443641+yuunlimm@users.noreply.github.com> --- .../models/fungible_asset_models/mod.rs | 1 + .../parquet_v2_fungible_asset_activities.rs | 275 +++++++++++++ rust/processor/src/processors/mod.rs | 7 + .../src/processors/parquet_processors/mod.rs | 1 + ...uet_fungible_asset_activities_processor.rs | 372 ++++++++++++++++++ rust/processor/src/worker.rs | 8 + 6 files changed, 664 insertions(+) create mode 100644 rust/processor/src/db/common/models/fungible_asset_models/parquet_v2_fungible_asset_activities.rs create mode 100644 rust/processor/src/processors/parquet_processors/parquet_fungible_asset_activities_processor.rs diff --git a/rust/processor/src/db/common/models/fungible_asset_models/mod.rs b/rust/processor/src/db/common/models/fungible_asset_models/mod.rs index 0a7df8e71..cfe2fb296 100644 --- a/rust/processor/src/db/common/models/fungible_asset_models/mod.rs +++ b/rust/processor/src/db/common/models/fungible_asset_models/mod.rs @@ -8,4 +8,5 @@ pub mod v2_fungible_metadata; // parquet models pub mod parquet_coin_supply; +pub mod parquet_v2_fungible_asset_activities; pub mod parquet_v2_fungible_asset_balances; diff --git a/rust/processor/src/db/common/models/fungible_asset_models/parquet_v2_fungible_asset_activities.rs b/rust/processor/src/db/common/models/fungible_asset_models/parquet_v2_fungible_asset_activities.rs new file mode 100644 index 000000000..922db71f7 --- /dev/null +++ b/rust/processor/src/db/common/models/fungible_asset_models/parquet_v2_fungible_asset_activities.rs @@ -0,0 +1,275 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use super::v2_fungible_asset_utils::{FeeStatement, FungibleAssetEvent}; +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::{ + coin_models::{ + coin_activities::CoinActivity, + coin_utils::{CoinEvent, CoinInfoType, EventGuidResource}, + }, + object_models::v2_object_utils::ObjectAggregatedDataMapping, + token_v2_models::v2_token_utils::TokenStandard, + }, + utils::util::{bigdecimal_to_u64, standardize_address}, +}; +use ahash::AHashMap; +use allocative::Allocative; +use anyhow::Context; +use aptos_protos::transaction::v1::{Event, TransactionInfo, UserTransactionRequest}; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +pub const GAS_FEE_EVENT: &str = "0x1::aptos_coin::GasFeeEvent"; +// We will never have a negative number on chain so this will avoid collision in postgres +pub const BURN_GAS_EVENT_CREATION_NUM: i64 = -1; +pub const BURN_GAS_EVENT_INDEX: i64 = -1; + +pub type OwnerAddress = String; +pub type CoinType = String; +// Primary key of the current_coin_balances table, i.e. (owner_address, coin_type) +pub type CurrentCoinBalancePK = (OwnerAddress, CoinType); +pub type EventToCoinType = AHashMap; + +/// TODO: This is just a copy of v2_fungible_asset_activities.rs. We should unify the 2 implementations +/// and have parquet as an output. +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct FungibleAssetActivity { + pub txn_version: i64, + pub event_index: i64, + pub owner_address: Option, + pub storage_id: String, + pub asset_type: Option, + pub is_frozen: Option, + pub amount: Option, // it is a string representation of the u128 + pub event_type: String, + pub is_gas_fee: bool, + pub gas_fee_payer_address: Option, + pub is_transaction_success: bool, + pub entry_function_id_str: Option, + pub block_height: i64, + pub token_standard: String, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, + pub storage_refund_octa: u64, +} + +impl NamedTable for FungibleAssetActivity { + const TABLE_NAME: &'static str = "fungible_asset_activities"; +} + +impl HasVersion for FungibleAssetActivity { + fn version(&self) -> i64 { + self.txn_version + } +} + +impl GetTimeStamp for FungibleAssetActivity { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + +impl FungibleAssetActivity { + pub fn get_v2_from_event( + event: &Event, + txn_version: i64, + block_height: i64, + txn_timestamp: chrono::NaiveDateTime, + event_index: i64, + entry_function_id_str: &Option, + object_aggregated_data_mapping: &ObjectAggregatedDataMapping, + ) -> anyhow::Result> { + let event_type = event.type_str.clone(); + if let Some(fa_event) = + &FungibleAssetEvent::from_event(event_type.as_str(), &event.data, txn_version)? + { + let (storage_id, is_frozen, amount) = match fa_event { + FungibleAssetEvent::WithdrawEvent(inner) => ( + standardize_address(&event.key.as_ref().unwrap().account_address), + None, + Some(inner.amount.to_string()), + ), + FungibleAssetEvent::DepositEvent(inner) => ( + standardize_address(&event.key.as_ref().unwrap().account_address), + None, + Some(inner.amount.to_string()), + ), + FungibleAssetEvent::FrozenEvent(inner) => ( + standardize_address(&event.key.as_ref().unwrap().account_address), + Some(inner.frozen), + None, + ), + FungibleAssetEvent::WithdrawEventV2(inner) => ( + standardize_address(&inner.store), + None, + Some(inner.amount.to_string()), + ), + FungibleAssetEvent::DepositEventV2(inner) => ( + standardize_address(&inner.store), + None, + Some(inner.amount.to_string()), + ), + FungibleAssetEvent::FrozenEventV2(inner) => { + (standardize_address(&inner.store), Some(inner.frozen), None) + }, + }; + + // The event account address will also help us find fungible store which tells us where to find + // the metadata + let maybe_object_metadata = object_aggregated_data_mapping.get(&storage_id); + // The ObjectCore might not exist in the transaction if the object got deleted + let maybe_owner_address = maybe_object_metadata + .map(|metadata| &metadata.object.object_core) + .map(|object_core| object_core.get_owner_address()); + // The FungibleStore might not exist in the transaction if it's a secondary store that got burnt + let maybe_asset_type = maybe_object_metadata + .and_then(|metadata| metadata.fungible_asset_store.as_ref()) + .map(|fa| fa.metadata.get_reference_address()); + + return Ok(Some(Self { + txn_version, + event_index, + owner_address: maybe_owner_address, + storage_id: storage_id.clone(), + asset_type: maybe_asset_type, + is_frozen, + amount, + event_type: event_type.clone(), + is_gas_fee: false, + gas_fee_payer_address: None, + is_transaction_success: true, + entry_function_id_str: entry_function_id_str.clone(), + block_height, + token_standard: TokenStandard::V2.to_string(), + block_timestamp: txn_timestamp, + storage_refund_octa: 0, + })); + } + Ok(None) + } + + pub fn get_v1_from_event( + event: &Event, + txn_version: i64, + block_height: i64, + block_timestamp: chrono::NaiveDateTime, + entry_function_id_str: &Option, + event_to_coin_type: &EventToCoinType, + event_index: i64, + ) -> anyhow::Result> { + if let Some(inner) = + CoinEvent::from_event(event.type_str.as_str(), &event.data, txn_version)? + { + let (owner_address, amount, coin_type_option) = match inner { + CoinEvent::WithdrawCoinEvent(inner) => ( + standardize_address(&event.key.as_ref().unwrap().account_address), + inner.amount.to_string(), + None, + ), + CoinEvent::DepositCoinEvent(inner) => ( + standardize_address(&event.key.as_ref().unwrap().account_address), + inner.amount.to_string(), + None, + ), + }; + let coin_type = if let Some(coin_type) = coin_type_option { + coin_type + } else { + let event_key = event.key.as_ref().context("event must have a key")?; + let event_move_guid = EventGuidResource { + addr: standardize_address(event_key.account_address.as_str()), + creation_num: event_key.creation_number as i64, + }; + // Given this mapping only contains coin type < 1000 length, we should not assume that the mapping exists. + // If it doesn't exist, skip. + match event_to_coin_type.get(&event_move_guid) { + Some(coin_type) => coin_type.clone(), + None => { + tracing::warn!( + "Could not find event in resources (CoinStore), version: {}, event guid: {:?}, mapping: {:?}", + txn_version, event_move_guid, event_to_coin_type + ); + return Ok(None); + }, + } + }; + + let storage_id = + CoinInfoType::get_storage_id(coin_type.as_str(), owner_address.as_str()); + + Ok(Some(Self { + txn_version, + event_index, + owner_address: Some(owner_address), + storage_id, + asset_type: Some(coin_type), + is_frozen: None, + amount: Some(amount), + event_type: event.type_str.clone(), + is_gas_fee: false, + gas_fee_payer_address: None, + is_transaction_success: true, + entry_function_id_str: entry_function_id_str.clone(), + block_height, + token_standard: TokenStandard::V1.to_string(), + block_timestamp, + storage_refund_octa: 0, + })) + } else { + Ok(None) + } + } + + /// Artificially creates a gas event. If it's a fee payer, still show gas event to the sender + /// but with an extra field to indicate the fee payer. + pub fn get_gas_event( + txn_info: &TransactionInfo, + user_transaction_request: &UserTransactionRequest, + entry_function_id_str: &Option, + txn_version: i64, + block_timestamp: chrono::NaiveDateTime, + block_height: i64, + fee_statement: Option, + ) -> Self { + let v1_activity = CoinActivity::get_gas_event( + txn_info, + user_transaction_request, + entry_function_id_str, + txn_version, + block_timestamp, + block_height, + fee_statement, + ); + let storage_id = CoinInfoType::get_storage_id( + v1_activity.coin_type.as_str(), + v1_activity.owner_address.as_str(), + ); + Self { + txn_version, + event_index: v1_activity.event_index.unwrap(), + owner_address: Some(v1_activity.owner_address), + storage_id, + asset_type: Some(v1_activity.coin_type), + is_frozen: None, + amount: Some(v1_activity.amount.to_string()), + event_type: v1_activity.activity_type, + is_gas_fee: v1_activity.is_gas_fee, + gas_fee_payer_address: v1_activity.gas_fee_payer_address, + is_transaction_success: v1_activity.is_transaction_success, + entry_function_id_str: v1_activity.entry_function_id_str, + block_height, + token_standard: TokenStandard::V1.to_string(), + block_timestamp, + storage_refund_octa: bigdecimal_to_u64(&v1_activity.storage_refund_amount), + } + } +} diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index 508b91e88..a04b0904b 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -39,6 +39,9 @@ use crate::{ parquet_ans_processor::{ParquetAnsProcessor, ParquetAnsProcessorConfig}, parquet_default_processor::{ParquetDefaultProcessor, ParquetDefaultProcessorConfig}, parquet_events_processor::{ParquetEventsProcessor, ParquetEventsProcessorConfig}, + parquet_fungible_asset_activities_processor::{ + ParquetFungibleAssetActivitiesProcessor, ParquetFungibleAssetActivitiesProcessorConfig, + }, parquet_fungible_asset_processor::{ ParquetFungibleAssetProcessor, ParquetFungibleAssetProcessorConfig, }, @@ -99,6 +102,7 @@ pub trait ProcessorTrait: Send + Sync + Debug { /// Gets the connection. /// If it was unable to do so (default timeout: 30s), it will keep retrying until it can. + #[allow(unknown_lints)] #[allow(elided_named_lifetimes)] async fn get_conn(&self) -> DbPoolConnection { let pool = self.connection_pool(); @@ -201,6 +205,7 @@ pub enum ProcessorConfig { TransactionMetadataProcessor, UserTransactionProcessor, ParquetDefaultProcessor(ParquetDefaultProcessorConfig), + ParquetFungibleAssetActivitiesProcessor(ParquetFungibleAssetActivitiesProcessorConfig), ParquetFungibleAssetProcessor(ParquetFungibleAssetProcessorConfig), ParquetTransactionMetadataProcessor(ParquetTransactionMetadataProcessorConfig), ParquetAnsProcessor(ParquetAnsProcessorConfig), @@ -224,6 +229,7 @@ impl ProcessorConfig { | ProcessorConfig::ParquetAnsProcessor(_) | ProcessorConfig::ParquetEventsProcessor(_) | ProcessorConfig::ParquetTokenV2Processor(_) + | ProcessorConfig::ParquetFungibleAssetActivitiesProcessor(_) ) } } @@ -262,6 +268,7 @@ pub enum Processor { UserTransactionProcessor, // Parquet processors ParquetDefaultProcessor, + ParquetFungibleAssetActivitiesProcessor, ParquetFungibleAssetProcessor, ParquetTransactionMetadataProcessor, ParquetAnsProcessor, diff --git a/rust/processor/src/processors/parquet_processors/mod.rs b/rust/processor/src/processors/parquet_processors/mod.rs index 833dd8689..37cc45554 100644 --- a/rust/processor/src/processors/parquet_processors/mod.rs +++ b/rust/processor/src/processors/parquet_processors/mod.rs @@ -3,6 +3,7 @@ use std::time::Duration; pub mod parquet_ans_processor; pub mod parquet_default_processor; pub mod parquet_events_processor; +pub mod parquet_fungible_asset_activities_processor; pub mod parquet_fungible_asset_processor; pub mod parquet_token_v2_processor; pub mod parquet_transaction_metadata_processor; diff --git a/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_activities_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_activities_processor.rs new file mode 100644 index 000000000..0bbf0b5bc --- /dev/null +++ b/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_activities_processor.rs @@ -0,0 +1,372 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::ParquetProcessorTrait; +use crate::{ + bq_analytics::{ + create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric, + ParquetProcessingResult, + }, + db::common::models::{ + fungible_asset_models::{ + parquet_v2_fungible_asset_activities::{EventToCoinType, FungibleAssetActivity}, + parquet_v2_fungible_asset_balances::FungibleAssetBalance, + v2_fungible_asset_utils::{ + ConcurrentFungibleAssetBalance, ConcurrentFungibleAssetSupply, FeeStatement, + FungibleAssetMetadata, FungibleAssetStore, FungibleAssetSupply, + }, + }, + object_models::v2_object_utils::{ + ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, Untransferable, + }, + }, + gap_detectors::ProcessingResult, + processors::{ProcessorName, ProcessorTrait}, + utils::{ + database::ArcDbPool, + util::{get_entry_function_from_user_request, standardize_address}, + }, +}; +use ahash::AHashMap; +use anyhow::anyhow; +use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Change, Transaction}; +use async_trait::async_trait; +use chrono::NaiveDateTime; +use kanal::AsyncSender; +use serde::{Deserialize, Serialize}; +use std::{fmt::Debug, time::Duration}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct ParquetFungibleAssetActivitiesProcessorConfig { + pub google_application_credentials: Option, + pub bucket_name: String, + pub bucket_root: String, + pub parquet_handler_response_channel_size: usize, + pub max_buffer_size: usize, + pub parquet_upload_interval: u64, +} + +impl ParquetProcessorTrait for ParquetFungibleAssetActivitiesProcessorConfig { + fn parquet_upload_interval_in_secs(&self) -> Duration { + Duration::from_secs(self.parquet_upload_interval) + } +} + +pub struct ParquetFungibleAssetActivitiesProcessor { + connection_pool: ArcDbPool, + fungible_asset_activities_sender: AsyncSender>, +} + +impl ParquetFungibleAssetActivitiesProcessor { + pub fn new( + connection_pool: ArcDbPool, + config: ParquetFungibleAssetActivitiesProcessorConfig, + new_gap_detector_sender: AsyncSender, + ) -> Self { + config.set_google_credentials(config.google_application_credentials.clone()); + + let fungible_asset_activities_sender: AsyncSender< + ParquetDataGeneric, + > = create_parquet_handler_loop::( + new_gap_detector_sender.clone(), + ProcessorName::ParquetFungibleAssetActivitiesProcessor.into(), + config.bucket_name.clone(), + config.bucket_root.clone(), + config.parquet_handler_response_channel_size, + config.max_buffer_size, + config.parquet_upload_interval_in_secs(), + ); + + Self { + connection_pool, + fungible_asset_activities_sender, + } + } +} + +impl Debug for ParquetFungibleAssetActivitiesProcessor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ParquetFungibleAssetActivitiesProcessor {{ capacity of fungible_asset_activites channel: {:?}}}", + &self.fungible_asset_activities_sender.capacity(), + ) + } +} + +#[async_trait] +impl ProcessorTrait for ParquetFungibleAssetActivitiesProcessor { + fn name(&self) -> &'static str { + ProcessorName::ParquetFungibleAssetActivitiesProcessor.into() + } + + async fn process_transactions( + &self, + transactions: Vec, + start_version: u64, + end_version: u64, + _: Option, + ) -> anyhow::Result { + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut transaction_version_to_struct_count: AHashMap = AHashMap::new(); + + let fungible_asset_activities = + parse_activities(&transactions, &mut transaction_version_to_struct_count).await; + let parquet_fungible_asset_activities = ParquetDataGeneric { + data: fungible_asset_activities, + }; + + self.fungible_asset_activities_sender + .send(parquet_fungible_asset_activities) + .await + .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; + + Ok(ProcessingResult::ParquetProcessingResult( + ParquetProcessingResult { + start_version: start_version as i64, + end_version: end_version as i64, + last_transaction_timestamp: last_transaction_timestamp.clone(), + txn_version_to_struct_count: Some(transaction_version_to_struct_count), + parquet_processed_structs: None, + table_name: "".to_string(), + }, + )) + } + + fn connection_pool(&self) -> &ArcDbPool { + &self.connection_pool + } +} + +async fn parse_activities( + transactions: &[Transaction], + transaction_version_to_struct_count: &mut AHashMap, +) -> Vec { + let mut fungible_asset_activities = vec![]; + + let data: Vec<_> = transactions + .iter() + .map(|txn| { + let mut fungible_asset_activities = vec![]; + + // Get Metadata for fungible assets by object + let mut fungible_asset_object_helper: ObjectAggregatedDataMapping = AHashMap::new(); + + let txn_version = txn.version as i64; + let block_height = txn.block_height as i64; + if txn.txn_data.is_none() { + tracing::warn!( + transaction_version = txn_version, + "Transaction data doesn't exist" + ); + return vec![]; + } + let txn_data = txn.txn_data.as_ref().unwrap(); + let transaction_info = txn.info.as_ref().expect("Transaction info doesn't exist!"); + let txn_timestamp = txn + .timestamp + .as_ref() + .expect("Transaction timestamp doesn't exist!") + .seconds; + #[allow(deprecated)] + let txn_timestamp = NaiveDateTime::from_timestamp_opt(txn_timestamp, 0) + .expect("Txn Timestamp is invalid!"); + + let default = vec![]; + let (events, user_request, entry_function_id_str) = match txn_data { + TxnData::BlockMetadata(tx_inner) => (&tx_inner.events, None, None), + TxnData::Validator(tx_inner) => (&tx_inner.events, None, None), + TxnData::Genesis(tx_inner) => (&tx_inner.events, None, None), + TxnData::User(tx_inner) => { + let user_request = tx_inner + .request + .as_ref() + .expect("Sends is not present in user txn"); + let entry_function_id_str = get_entry_function_from_user_request(user_request); + (&tx_inner.events, Some(user_request), entry_function_id_str) + }, + _ => (&default, None, None), + }; + + // This is because v1 events (deposit/withdraw) don't have coin type so the only way is to match + // the event to the resource using the event guid + let mut event_to_v1_coin_type: EventToCoinType = AHashMap::new(); + + // First loop to get all objects + // Need to do a first pass to get all the objects + for wsc in transaction_info.changes.iter() { + if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { + if let Some(object) = + ObjectWithMetadata::from_write_resource(wr, txn_version).unwrap() + { + fungible_asset_object_helper.insert( + standardize_address(&wr.address.to_string()), + ObjectAggregatedData { + object, + ..ObjectAggregatedData::default() + }, + ); + } + } + } + // Loop to get the metadata relevant to parse v1 and v2. + // As an optimization, we also handle v1 balances in the process + for (index, wsc) in transaction_info.changes.iter().enumerate() { + if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { + if let Some((_, _, event_to_coin)) = + FungibleAssetBalance::get_v1_from_write_resource( + write_resource, + index as i64, + txn_version, + txn_timestamp, + ) + .unwrap() + { + event_to_v1_coin_type.extend(event_to_coin); + } + // Fill the v2 object metadata + let address = standardize_address(&write_resource.address.to_string()); + if let Some(aggregated_data) = fungible_asset_object_helper.get_mut(&address) { + if let Some(fungible_asset_metadata) = + FungibleAssetMetadata::from_write_resource(write_resource, txn_version) + .unwrap() + { + aggregated_data.fungible_asset_metadata = Some(fungible_asset_metadata); + } + if let Some(fungible_asset_store) = + FungibleAssetStore::from_write_resource(write_resource, txn_version) + .unwrap() + { + aggregated_data.fungible_asset_store = Some(fungible_asset_store); + } + if let Some(fungible_asset_supply) = + FungibleAssetSupply::from_write_resource(write_resource, txn_version) + .unwrap() + { + aggregated_data.fungible_asset_supply = Some(fungible_asset_supply); + } + if let Some(concurrent_fungible_asset_supply) = + ConcurrentFungibleAssetSupply::from_write_resource( + write_resource, + txn_version, + ) + .unwrap() + { + aggregated_data.concurrent_fungible_asset_supply = + Some(concurrent_fungible_asset_supply); + } + if let Some(concurrent_fungible_asset_balance) = + ConcurrentFungibleAssetBalance::from_write_resource( + write_resource, + txn_version, + ) + .unwrap() + { + aggregated_data.concurrent_fungible_asset_balance = + Some(concurrent_fungible_asset_balance); + } + if let Some(untransferable) = + Untransferable::from_write_resource(write_resource, txn_version) + .unwrap() + { + aggregated_data.untransferable = Some(untransferable); + } + } + } else if let Change::DeleteResource(delete_resource) = wsc.change.as_ref().unwrap() + { + if let Some((_, _, event_to_coin)) = + FungibleAssetBalance::get_v1_from_delete_resource( + delete_resource, + index as i64, + txn_version, + txn_timestamp, + ) + .unwrap() + { + event_to_v1_coin_type.extend(event_to_coin); + } + } + } + + // The artificial gas event, only need for v1 + if let Some(req) = user_request { + let fee_statement = events.iter().find_map(|event| { + let event_type = event.type_str.as_str(); + FeeStatement::from_event(event_type, &event.data, txn_version) + }); + let gas_event = FungibleAssetActivity::get_gas_event( + transaction_info, + req, + &entry_function_id_str, + txn_version, + txn_timestamp, + block_height, + fee_statement, + ); + fungible_asset_activities.push(gas_event); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1) + .or_insert(1); + } + + // Loop to handle events and collect additional metadata from events for v2 + for (index, event) in events.iter().enumerate() { + if let Some(v1_activity) = FungibleAssetActivity::get_v1_from_event( + event, + txn_version, + block_height, + txn_timestamp, + &entry_function_id_str, + &event_to_v1_coin_type, + index as i64, + ) + .unwrap_or_else(|e| { + tracing::error!( + transaction_version = txn_version, + index = index, + error = ?e, + "[Parser] error parsing fungible asset activity v1"); + panic!("[Parser] error parsing fungible asset activity v1"); + }) { + fungible_asset_activities.push(v1_activity); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1) + .or_insert(1); + } + if let Some(v2_activity) = FungibleAssetActivity::get_v2_from_event( + event, + txn_version, + block_height, + txn_timestamp, + index as i64, + &entry_function_id_str, + &fungible_asset_object_helper, + ) + .unwrap_or_else(|e| { + tracing::error!( + transaction_version = txn_version, + index = index, + error = ?e, + "[Parser] error parsing fungible asset activity v2"); + panic!("[Parser] error parsing fungible asset activity v2"); + }) { + fungible_asset_activities.push(v2_activity); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1) + .or_insert(1); + } + } + + fungible_asset_activities + }) + .collect(); + + for faa in data { + fungible_asset_activities.extend(faa); + } + fungible_asset_activities +} diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index e784d8d72..cbbbd15c1 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -22,6 +22,7 @@ use crate::{ parquet_ans_processor::ParquetAnsProcessor, parquet_default_processor::ParquetDefaultProcessor, parquet_events_processor::ParquetEventsProcessor, + parquet_fungible_asset_activities_processor::ParquetFungibleAssetActivitiesProcessor, parquet_fungible_asset_processor::ParquetFungibleAssetProcessor, parquet_token_v2_processor::ParquetTokenV2Processor, parquet_transaction_metadata_processor::ParquetTransactionMetadataProcessor, @@ -1004,5 +1005,12 @@ pub fn build_processor( config.clone(), gap_detector_sender.expect("Parquet processor requires a gap detector sender"), )), + ProcessorConfig::ParquetFungibleAssetActivitiesProcessor(config) => { + Processor::from(ParquetFungibleAssetActivitiesProcessor::new( + db_pool, + config.clone(), + gap_detector_sender.expect("Parquet processor requires a gap detector sender"), + )) + }, } }