From ed72b46066fa61644720eb5dc3f3bb0cf66de7ca Mon Sep 17 00:00:00 2001 From: Yuun Lim <38443641+yuunlimm@users.noreply.github.com> Date: Thu, 11 Jul 2024 20:00:29 -0700 Subject: [PATCH] add parquet_fungible_asset_processor (#456) * add parquet_fungible_asset_processor * lint --- .../models/fungible_asset_models/mod.rs | 4 + .../parquet_coin_supply.rs | 104 +++++++ .../parquet_v2_fungible_asset_balances.rs | 245 ++++++++++++++++ .../v2_fungible_asset_balances.rs | 2 +- .../processors/fungible_asset_processor.rs | 6 +- rust/processor/src/processors/mod.rs | 15 +- .../src/processors/parquet_processors/mod.rs | 2 + .../parquet_fungible_asset_processor.rs | 277 ++++++++++++++++++ rust/processor/src/worker.rs | 31 +- 9 files changed, 669 insertions(+), 17 deletions(-) create mode 100644 rust/processor/src/db/common/models/fungible_asset_models/parquet_coin_supply.rs create mode 100644 rust/processor/src/db/common/models/fungible_asset_models/parquet_v2_fungible_asset_balances.rs create mode 100644 rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs diff --git a/rust/processor/src/db/common/models/fungible_asset_models/mod.rs b/rust/processor/src/db/common/models/fungible_asset_models/mod.rs index d1b93659d..0a7df8e71 100644 --- a/rust/processor/src/db/common/models/fungible_asset_models/mod.rs +++ b/rust/processor/src/db/common/models/fungible_asset_models/mod.rs @@ -5,3 +5,7 @@ pub mod v2_fungible_asset_activities; pub mod v2_fungible_asset_balances; pub mod v2_fungible_asset_utils; pub mod v2_fungible_metadata; + +// parquet models +pub mod parquet_coin_supply; +pub mod parquet_v2_fungible_asset_balances; diff --git a/rust/processor/src/db/common/models/fungible_asset_models/parquet_coin_supply.rs b/rust/processor/src/db/common/models/fungible_asset_models/parquet_coin_supply.rs new file mode 100644 index 000000000..acc6936bb --- /dev/null +++ b/rust/processor/src/db/common/models/fungible_asset_models/parquet_coin_supply.rs @@ -0,0 +1,104 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::default_models::move_tables::TableItem, + utils::util::{hash_str, APTOS_COIN_TYPE_STR}, +}; +use allocative_derive::Allocative; +use anyhow::Context; +use aptos_protos::transaction::v1::WriteTableItem; +use bigdecimal::BigDecimal; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +const APTOS_COIN_SUPPLY_TABLE_HANDLE: &str = + "0x1b854694ae746cdbd8d44186ca4929b2b337df21d1c74633be19b2710552fdca"; +const APTOS_COIN_SUPPLY_TABLE_KEY: &str = + "0x619dc29a0aac8fa146714058e8dd6d2d0f3bdf5f6331907bf91f3acd81e6935"; + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct CoinSupply { + pub txn_version: i64, + pub coin_type_hash: String, + pub coin_type: String, + pub supply: String, // it is a string representation of the u128 + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, +} + +impl NamedTable for CoinSupply { + const TABLE_NAME: &'static str = "coin_supply"; +} + +impl HasVersion for CoinSupply { + fn version(&self) -> i64 { + self.txn_version + } +} + +impl GetTimeStamp for CoinSupply { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + +impl CoinSupply { + /// Currently only supports aptos_coin. Aggregator table detail is in CoinInfo which for aptos coin appears during genesis. + /// We query for the aggregator table details (handle and key) once upon indexer initiation and use it to fetch supply. + pub fn from_write_table_item( + write_table_item: &WriteTableItem, + txn_version: i64, + txn_timestamp: chrono::NaiveDateTime, + ) -> anyhow::Result> { + if let Some(data) = &write_table_item.data { + // Return early if not aggregator table type + if !(data.key_type == "address" && data.value_type == "u128") { + return Ok(None); + } + // Return early if not aggregator table handle + if write_table_item.handle.as_str() != APTOS_COIN_SUPPLY_TABLE_HANDLE { + return Ok(None); + } + + // Convert to TableItem model. Some fields are just placeholders + let (table_item_model, _) = + TableItem::from_write_table_item(write_table_item, 0, txn_version, 0); + + // Return early if not aptos coin aggregator key + let table_key = table_item_model.decoded_key.as_str().unwrap(); + if table_key != APTOS_COIN_SUPPLY_TABLE_KEY { + return Ok(None); + } + // Everything matches. Get the coin supply + let supply = table_item_model + .decoded_value + .as_ref() + .unwrap() + .as_str() + .unwrap() + .parse::() + .context(format!( + "cannot parse string as u128: {:?}, version {}", + table_item_model.decoded_value.as_ref(), + txn_version + ))?; + return Ok(Some(Self { + txn_version, + coin_type_hash: hash_str(APTOS_COIN_TYPE_STR), + coin_type: APTOS_COIN_TYPE_STR.to_string(), + supply: supply.to_string(), + block_timestamp: txn_timestamp, + })); + } + Ok(None) + } +} diff --git a/rust/processor/src/db/common/models/fungible_asset_models/parquet_v2_fungible_asset_balances.rs b/rust/processor/src/db/common/models/fungible_asset_models/parquet_v2_fungible_asset_balances.rs new file mode 100644 index 000000000..0c648c7cb --- /dev/null +++ b/rust/processor/src/db/common/models/fungible_asset_models/parquet_v2_fungible_asset_balances.rs @@ -0,0 +1,245 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::{ + coin_models::coin_utils::{CoinInfoType, CoinResource}, + fungible_asset_models::{ + v2_fungible_asset_activities::EventToCoinType, + v2_fungible_asset_balances::{ + get_primary_fungible_store_address, CurrentFungibleAssetBalance, + }, + v2_fungible_asset_utils::FungibleAssetStore, + }, + object_models::v2_object_utils::ObjectAggregatedDataMapping, + token_v2_models::v2_token_utils::TokenStandard, + }, + utils::util::standardize_address, +}; +use ahash::AHashMap; +use allocative_derive::Allocative; +use aptos_protos::transaction::v1::{DeleteResource, WriteResource}; +use bigdecimal::{BigDecimal, Zero}; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct FungibleAssetBalance { + pub txn_version: i64, + pub write_set_change_index: i64, + pub storage_id: String, + pub owner_address: String, + pub asset_type: String, + pub is_primary: bool, + pub is_frozen: bool, + pub amount: String, // it is a string representation of the u128 + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, + pub token_standard: String, +} + +impl NamedTable for FungibleAssetBalance { + const TABLE_NAME: &'static str = "fungible_asset_balances"; +} + +impl HasVersion for FungibleAssetBalance { + fn version(&self) -> i64 { + self.txn_version + } +} + +impl GetTimeStamp for FungibleAssetBalance { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + +impl FungibleAssetBalance { + /// Basically just need to index FA Store, but we'll need to look up FA metadata + pub async fn get_v2_from_write_resource( + write_resource: &WriteResource, + write_set_change_index: i64, + txn_version: i64, + txn_timestamp: chrono::NaiveDateTime, + object_metadatas: &ObjectAggregatedDataMapping, + ) -> anyhow::Result> { + if let Some(inner) = &FungibleAssetStore::from_write_resource(write_resource, txn_version)? + { + let storage_id = standardize_address(write_resource.address.as_str()); + // Need to get the object of the store + if let Some(object_data) = object_metadatas.get(&storage_id) { + let object = &object_data.object.object_core; + let owner_address = object.get_owner_address(); + let asset_type = inner.metadata.get_reference_address(); + let is_primary = Self::is_primary(&owner_address, &asset_type, &storage_id); + + let concurrent_balance = object_data + .concurrent_fungible_asset_balance + .as_ref() + .map(|concurrent_fungible_asset_balance| { + concurrent_fungible_asset_balance.balance.value.clone() + }); + + let coin_balance = Self { + txn_version, + write_set_change_index, + storage_id: storage_id.clone(), + owner_address: owner_address.clone(), + asset_type: asset_type.clone(), + is_primary, + is_frozen: inner.frozen, + amount: concurrent_balance + .clone() + .unwrap_or_else(|| inner.balance.clone()) + .to_string(), + block_timestamp: txn_timestamp, + token_standard: TokenStandard::V2.to_string(), + }; + let current_coin_balance = CurrentFungibleAssetBalance { + storage_id, + owner_address, + asset_type: asset_type.clone(), + is_primary, + is_frozen: inner.frozen, + amount: concurrent_balance.unwrap_or_else(|| inner.balance.clone()), + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + token_standard: TokenStandard::V2.to_string(), + }; + return Ok(Some((coin_balance, current_coin_balance))); + } + } + + Ok(None) + } + + pub fn get_v1_from_delete_resource( + delete_resource: &DeleteResource, + write_set_change_index: i64, + txn_version: i64, + txn_timestamp: chrono::NaiveDateTime, + ) -> anyhow::Result> { + if let Some(CoinResource::CoinStoreDeletion) = + &CoinResource::from_delete_resource(delete_resource, txn_version)? + { + let coin_info_type = &CoinInfoType::from_move_type( + &delete_resource.r#type.as_ref().unwrap().generic_type_params[0], + delete_resource.type_str.as_ref(), + txn_version, + ); + if let Some(coin_type) = coin_info_type.get_coin_type_below_max() { + let owner_address = standardize_address(delete_resource.address.as_str()); + let storage_id = + CoinInfoType::get_storage_id(coin_type.as_str(), owner_address.as_str()); + let coin_balance = Self { + txn_version, + write_set_change_index, + storage_id: storage_id.clone(), + owner_address: owner_address.clone(), + asset_type: coin_type.clone(), + is_primary: true, + is_frozen: false, + amount: "0".to_string(), + block_timestamp: txn_timestamp, + token_standard: TokenStandard::V1.to_string(), + }; + let current_coin_balance = CurrentFungibleAssetBalance { + storage_id, + owner_address, + asset_type: coin_type.clone(), + is_primary: true, + is_frozen: false, + amount: BigDecimal::zero(), + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + token_standard: TokenStandard::V1.to_string(), + }; + return Ok(Some(( + coin_balance, + current_coin_balance, + AHashMap::default(), + ))); + } + } + Ok(None) + } + + /// Getting coin balances from resources for v1 + /// If the fully qualified coin type is too long (currently 1000 length), we exclude from indexing + pub fn get_v1_from_write_resource( + write_resource: &WriteResource, + write_set_change_index: i64, + txn_version: i64, + txn_timestamp: chrono::NaiveDateTime, + ) -> anyhow::Result> { + if let Some(CoinResource::CoinStoreResource(inner)) = + &CoinResource::from_write_resource(write_resource, txn_version)? + { + let coin_info_type = &CoinInfoType::from_move_type( + &write_resource.r#type.as_ref().unwrap().generic_type_params[0], + write_resource.type_str.as_ref(), + txn_version, + ); + if let Some(coin_type) = coin_info_type.get_coin_type_below_max() { + let owner_address = standardize_address(write_resource.address.as_str()); + let storage_id = + CoinInfoType::get_storage_id(coin_type.as_str(), owner_address.as_str()); + let coin_balance = Self { + txn_version, + write_set_change_index, + storage_id: storage_id.clone(), + owner_address: owner_address.clone(), + asset_type: coin_type.clone(), + is_primary: true, + is_frozen: inner.frozen, + amount: inner.coin.value.clone().to_string(), + block_timestamp: txn_timestamp, + token_standard: TokenStandard::V1.to_string(), + }; + let current_coin_balance = CurrentFungibleAssetBalance { + storage_id, + owner_address, + asset_type: coin_type.clone(), + is_primary: true, + is_frozen: inner.frozen, + amount: inner.coin.value.clone(), + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + token_standard: TokenStandard::V1.to_string(), + }; + let event_to_coin_mapping: EventToCoinType = AHashMap::from([ + ( + inner.withdraw_events.guid.id.get_standardized(), + coin_type.clone(), + ), + (inner.deposit_events.guid.id.get_standardized(), coin_type), + ]); + return Ok(Some(( + coin_balance, + current_coin_balance, + event_to_coin_mapping, + ))); + } + } + Ok(None) + } + + /// Primary store address are derived from the owner address and object address in this format: sha3_256([source | object addr | 0xFC]). + /// This function expects the addresses to have length 66 + pub fn is_primary( + owner_address: &str, + metadata_address: &str, + fungible_store_address: &str, + ) -> bool { + fungible_store_address + == get_primary_fungible_store_address(owner_address, metadata_address).unwrap() + } +} diff --git a/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs b/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs index 42e317f18..648dc4da9 100644 --- a/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs +++ b/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs @@ -97,7 +97,7 @@ fn get_paired_metadata_address(coin_type_name: &str) -> String { } } -fn get_primary_fungible_store_address( +pub fn get_primary_fungible_store_address( owner_address: &str, metadata_address: &str, ) -> anyhow::Result { diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index 955bd0052..a4219be22 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -365,7 +365,7 @@ impl ProcessorTrait for FungibleAssetProcessor { mut fungible_asset_balances, mut current_fungible_asset_balances, current_unified_fungible_asset_balances, - mut coin_supply, + coin_supply, ) = parse_v2_coin(&transactions).await; let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); @@ -389,10 +389,6 @@ impl ProcessorTrait for FungibleAssetProcessor { current_fungible_asset_balances.clear(); } - if self.deprecated_tables.contains(TableFlags::COIN_SUPPLY) { - coin_supply.clear(); - } - let tx_result = insert_to_db( self.get_pool(), self.name(), diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index f2ea9cef2..61ba3f353 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -39,8 +39,11 @@ use self::{ use crate::{ db::common::models::processor_status::ProcessorStatus, gap_detectors::ProcessingResult, - processors::parquet_processors::parquet_default_processor::{ - ParquetDefaultProcessor, ParquetDefaultProcessorConfig, + processors::parquet_processors::{ + parquet_default_processor::{ParquetDefaultProcessor, ParquetDefaultProcessorConfig}, + parquet_fungible_asset_processor::{ + ParquetFungibleAssetProcessor, ParquetFungibleAssetProcessorConfig, + }, }, schema::processor_status, utils::{ @@ -196,6 +199,7 @@ pub enum ProcessorConfig { TransactionMetadataProcessor, UserTransactionProcessor, ParquetDefaultProcessor(ParquetDefaultProcessorConfig), + ParquetFungibleAssetProcessor(ParquetFungibleAssetProcessorConfig), } impl ProcessorConfig { @@ -206,7 +210,11 @@ impl ProcessorConfig { } pub fn is_parquet_processor(&self) -> bool { - matches!(self, ProcessorConfig::ParquetDefaultProcessor(_)) + matches!( + self, + ProcessorConfig::ParquetDefaultProcessor(_) + | ProcessorConfig::ParquetFungibleAssetProcessor(_) + ) } } @@ -243,6 +251,7 @@ pub enum Processor { TransactionMetadataProcessor, UserTransactionProcessor, ParquetDefaultProcessor, + ParquetFungibleAssetProcessor, } #[cfg(test)] diff --git a/rust/processor/src/processors/parquet_processors/mod.rs b/rust/processor/src/processors/parquet_processors/mod.rs index 19ac59e09..0315d7062 100644 --- a/rust/processor/src/processors/parquet_processors/mod.rs +++ b/rust/processor/src/processors/parquet_processors/mod.rs @@ -2,6 +2,8 @@ use std::time::Duration; pub mod parquet_default_processor; +pub mod parquet_fungible_asset_processor; + pub const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; pub trait UploadIntervalConfig { diff --git a/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs new file mode 100644 index 000000000..31ebcd35a --- /dev/null +++ b/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs @@ -0,0 +1,277 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::{UploadIntervalConfig, GOOGLE_APPLICATION_CREDENTIALS}; +use crate::{ + bq_analytics::{ + create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric, + ParquetProcessingResult, + }, + db::common::models::{ + fungible_asset_models::{ + parquet_coin_supply::CoinSupply, + parquet_v2_fungible_asset_balances::FungibleAssetBalance, + }, + object_models::v2_object_utils::{ + ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, + }, + }, + gap_detectors::ProcessingResult, + processors::{ProcessorName, ProcessorTrait}, + utils::{database::ArcDbPool, util::standardize_address}, +}; +use ahash::AHashMap; +use anyhow::anyhow; +use aptos_protos::transaction::v1::{write_set_change::Change, Transaction}; +use async_trait::async_trait; +use chrono::NaiveDateTime; +use kanal::AsyncSender; +use serde::{Deserialize, Serialize}; +use std::{fmt::Debug, time::Duration}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct ParquetFungibleAssetProcessorConfig { + pub google_application_credentials: Option, + pub bucket_name: String, + pub bucket_root: String, + pub parquet_handler_response_channel_size: usize, + pub max_buffer_size: usize, + pub parquet_upload_interval: u64, +} + +impl UploadIntervalConfig for ParquetFungibleAssetProcessorConfig { + fn parquet_upload_interval_in_secs(&self) -> Duration { + Duration::from_secs(self.parquet_upload_interval) + } +} + +pub struct ParquetFungibleAssetProcessor { + connection_pool: ArcDbPool, + coin_supply_sender: AsyncSender>, + fungible_asset_balances_sender: AsyncSender>, +} + +impl ParquetFungibleAssetProcessor { + pub fn new( + connection_pool: ArcDbPool, + config: ParquetFungibleAssetProcessorConfig, + new_gap_detector_sender: AsyncSender, + ) -> Self { + if let Some(credentials) = config.google_application_credentials.clone() { + std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); + } + + let coin_supply_sender = create_parquet_handler_loop::( + new_gap_detector_sender.clone(), + ProcessorName::ParquetFungibleAssetProcessor.into(), + config.bucket_name.clone(), + config.bucket_root.clone(), + config.parquet_handler_response_channel_size, + config.max_buffer_size, + config.parquet_upload_interval_in_secs(), + ); + + let fungible_asset_balances_sender = create_parquet_handler_loop::( + new_gap_detector_sender.clone(), + ProcessorName::ParquetFungibleAssetProcessor.into(), + config.bucket_name.clone(), + config.bucket_root.clone(), + config.parquet_handler_response_channel_size, + config.max_buffer_size, + config.parquet_upload_interval_in_secs(), + ); + + Self { + connection_pool, + coin_supply_sender, + fungible_asset_balances_sender, + } + } +} + +impl Debug for ParquetFungibleAssetProcessor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ParquetFungibleAssetProcessor {{ capacity of tsi channel: {:?}, capacity of es channel: {:?}}}", + &self.coin_supply_sender.capacity(), + &self.fungible_asset_balances_sender.capacity(), + ) + } +} + +#[async_trait] +impl ProcessorTrait for ParquetFungibleAssetProcessor { + fn name(&self) -> &'static str { + ProcessorName::ParquetFungibleAssetProcessor.into() + } + + async fn process_transactions( + &self, + transactions: Vec, + start_version: u64, + end_version: u64, + _: Option, + ) -> anyhow::Result { + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut transaction_version_to_struct_count: AHashMap = AHashMap::new(); + + let (fungible_asset_balances, coin_supply) = + parse_v2_coin(&transactions, &mut transaction_version_to_struct_count).await; + + let parquet_coin_supply = ParquetDataGeneric { + data: coin_supply, + transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), + }; + + self.coin_supply_sender + .send(parquet_coin_supply) + .await + .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; + + let parquet_fungible_asset_balances = ParquetDataGeneric { + data: fungible_asset_balances, + transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), + }; + + self.fungible_asset_balances_sender + .send(parquet_fungible_asset_balances) + .await + .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; + + Ok(ProcessingResult::ParquetProcessingResult( + ParquetProcessingResult { + start_version: start_version as i64, + end_version: end_version as i64, + last_transaction_timestamp: last_transaction_timestamp.clone(), + txn_version_to_struct_count: AHashMap::new(), + }, + )) + } + + fn connection_pool(&self) -> &ArcDbPool { + &self.connection_pool + } +} + +async fn parse_v2_coin( + transactions: &[Transaction], + transaction_version_to_struct_count: &mut AHashMap, +) -> (Vec, Vec) { + let mut fungible_asset_balances = vec![]; + let mut all_coin_supply = vec![]; + + // Get Metadata for fungible assets by object + let mut fungible_asset_object_helper: ObjectAggregatedDataMapping = AHashMap::new(); + + for txn in transactions { + let txn_version = txn.version as i64; + let transaction_info = txn.info.as_ref().expect("Transaction info doesn't exist!"); + let txn_timestamp = txn + .timestamp + .as_ref() + .expect("Transaction timestamp doesn't exist!") + .seconds; + #[allow(deprecated)] + let txn_timestamp = + NaiveDateTime::from_timestamp_opt(txn_timestamp, 0).expect("Txn Timestamp is invalid!"); + + // First loop to get all objects + // Need to do a first pass to get all the objects + for wsc in transaction_info.changes.iter() { + if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { + if let Some(object) = + ObjectWithMetadata::from_write_resource(wr, txn_version).unwrap() + { + fungible_asset_object_helper.insert( + standardize_address(&wr.address.to_string()), + ObjectAggregatedData { + object, + ..ObjectAggregatedData::default() + }, + ); + } + } + } + + for (index, wsc) in transaction_info.changes.iter().enumerate() { + if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { + if let Some((balance, _, _)) = FungibleAssetBalance::get_v1_from_write_resource( + write_resource, + index as i64, + txn_version, + txn_timestamp, + ) + .unwrap() + { + fungible_asset_balances.push(balance); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1) + .or_insert(1); + } + } else if let Change::DeleteResource(delete_resource) = wsc.change.as_ref().unwrap() { + if let Some((balance, _, _)) = FungibleAssetBalance::get_v1_from_delete_resource( + delete_resource, + index as i64, + txn_version, + txn_timestamp, + ) + .unwrap() + { + fungible_asset_balances.push(balance); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1) + .or_insert(1); + } + } + } + + // Loop to handle all the other changes + for (index, wsc) in transaction_info.changes.iter().enumerate() { + match wsc.change.as_ref().unwrap() { + Change::WriteResource(write_resource) => { + if let Some((balance, _)) = FungibleAssetBalance::get_v2_from_write_resource( + write_resource, + index as i64, + txn_version, + txn_timestamp, + &fungible_asset_object_helper, + ) + .await + .unwrap_or_else(|e| { + tracing::error!( + transaction_version = txn_version, + index = index, + error = ?e, + "[Parser] error parsing fungible balance v2"); + panic!("[Parser] error parsing fungible balance v2"); + }) { + fungible_asset_balances.push(balance); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1) + .or_insert(1); + } + }, + Change::WriteTableItem(table_item) => { + if let Some(coin_supply) = + CoinSupply::from_write_table_item(table_item, txn_version, txn_timestamp) + .unwrap() + { + all_coin_supply.push(coin_supply); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1) + .or_insert(1); + } + }, + _ => {}, + } + } + } + + (fungible_asset_balances, all_coin_supply) +} diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 0e59df4c9..312269c09 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -10,17 +10,25 @@ use crate::{ }, grpc_stream::TransactionsPBResponse, processors::{ - account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor, - coin_processor::CoinProcessor, default_processor::DefaultProcessor, - events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor, - monitoring_processor::MonitoringProcessor, nft_metadata_processor::NftMetadataProcessor, + account_transactions_processor::AccountTransactionsProcessor, + ans_processor::AnsProcessor, + coin_processor::CoinProcessor, + default_processor::DefaultProcessor, + events_processor::EventsProcessor, + fungible_asset_processor::FungibleAssetProcessor, + monitoring_processor::MonitoringProcessor, + nft_metadata_processor::NftMetadataProcessor, objects_processor::ObjectsProcessor, - parquet_processors::parquet_default_processor::ParquetDefaultProcessor, - stake_processor::StakeProcessor, token_processor::TokenProcessor, + parquet_processors::{ + parquet_default_processor::ParquetDefaultProcessor, + parquet_fungible_asset_processor::ParquetFungibleAssetProcessor, + }, + stake_processor::StakeProcessor, + token_processor::TokenProcessor, token_v2_processor::TokenV2Processor, transaction_metadata_processor::TransactionMetadataProcessor, - user_transaction_processor::UserTransactionProcessor, DefaultProcessingResult, Processor, - ProcessorConfig, ProcessorTrait, + user_transaction_processor::UserTransactionProcessor, + DefaultProcessingResult, Processor, ProcessorConfig, ProcessorTrait, }, schema::ledger_infos, transaction_filter::TransactionFilter, @@ -914,5 +922,12 @@ pub fn build_processor( gap_detector_sender.expect("Parquet processor requires a gap detector sender"), )) }, + ProcessorConfig::ParquetFungibleAssetProcessor(config) => { + Processor::from(ParquetFungibleAssetProcessor::new( + db_pool, + config.clone(), + gap_detector_sender.expect("Parquet processor requires a gap detector sender"), + )) + }, } }