diff --git a/rust/processor/src/db/common/models/account_transaction_models/mod.rs b/rust/processor/src/db/common/models/account_transaction_models/mod.rs new file mode 100644 index 000000000..131bc6b22 --- /dev/null +++ b/rust/processor/src/db/common/models/account_transaction_models/mod.rs @@ -0,0 +1,4 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod raw_account_transactions; diff --git a/rust/processor/src/db/common/models/account_transaction_models/raw_account_transactions.rs b/rust/processor/src/db/common/models/account_transaction_models/raw_account_transactions.rs new file mode 100644 index 000000000..fb43b46f4 --- /dev/null +++ b/rust/processor/src/db/common/models/account_transaction_models/raw_account_transactions.rs @@ -0,0 +1,107 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + db::postgres::models::{ + object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource, + user_transactions_models::user_transactions::UserTransaction, + }, + utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address}, +}; +use ahash::AHashSet; +use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Change, Transaction}; +use serde::{Deserialize, Serialize}; + +pub type AccountTransactionPK = (String, i64); + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RawAccountTransaction { + pub transaction_version: i64, + pub account_address: String, + pub block_timestamp: chrono::NaiveDateTime, +} + +impl RawAccountTransaction { + /// This table will record every transaction that touch an account which could be + /// a user account, an object, or a resource account. + /// We will consider all transactions that modify a resource or event associated with a particular account. + /// We will do 1 level of redirection for now (e.g. if it's an object, we will record the owner as account address). + /// We will also consider transactions that the account signed or is part of a multi sig / multi agent. + /// TODO: recursively find the parent account of an object + /// TODO: include table items in the detection path + pub fn get_accounts(transaction: &Transaction) -> AHashSet { + let txn_version = transaction.version as i64; + let txn_data = match transaction.txn_data.as_ref() { + Some(data) => data, + None => { + PROCESSOR_UNKNOWN_TYPE_COUNT + .with_label_values(&["AccountTransaction"]) + .inc(); + tracing::warn!( + transaction_version = transaction.version, + "Transaction data doesn't exist", + ); + return AHashSet::new(); + }, + }; + let transaction_info = transaction.info.as_ref().unwrap_or_else(|| { + panic!("Transaction info doesn't exist for version {}", txn_version) + }); + let wscs = &transaction_info.changes; + let (events, signatures) = match txn_data { + TxnData::User(inner) => ( + &inner.events, + UserTransaction::get_signatures( + inner.request.as_ref().unwrap_or_else(|| { + panic!("User request doesn't exist for version {}", txn_version) + }), + txn_version, + transaction.block_height as i64, + ), + ), + TxnData::Genesis(inner) => (&inner.events, vec![]), + TxnData::BlockMetadata(inner) => (&inner.events, vec![]), + TxnData::Validator(inner) => (&inner.events, vec![]), + _ => { + return AHashSet::new(); + }, + }; + let mut accounts = AHashSet::new(); + for sig in signatures { + accounts.insert(sig.signer); + } + for event in events { + // Record event account address. We don't really have to worry about objects here + // because it'll be taken care of in the resource section. + accounts.insert(standardize_address( + event.key.as_ref().unwrap().account_address.as_str(), + )); + } + for wsc in wscs { + match wsc.change.as_ref().unwrap() { + Change::DeleteResource(res) => { + // Record resource account. + // TODO: If the resource is an object, then we need to look for the latest + // owner. This isn't really possible right now given we have parallel threads + // so it'll be very difficult to ensure that we have the correct latest owner. + accounts.insert(standardize_address(res.address.as_str())); + }, + Change::WriteResource(res) => { + // Record resource account. If the resource is an object, then we record the + // owner as well. + // This handles partial deletes as well. + accounts.insert(standardize_address(res.address.as_str())); + if let Some(inner) = &ObjectWithMetadata::from_write_resource(res).unwrap() { + accounts.insert(inner.object_core.get_owner_address()); + } + }, + _ => {}, + } + } + accounts + } +} diff --git a/rust/processor/src/db/common/models/mod.rs b/rust/processor/src/db/common/models/mod.rs index e765a6281..613536313 100644 --- a/rust/processor/src/db/common/models/mod.rs +++ b/rust/processor/src/db/common/models/mod.rs @@ -1,3 +1,4 @@ +pub mod account_transaction_models; pub mod default_models; pub mod event_models; pub mod fungible_asset_models; diff --git a/rust/processor/src/db/parquet/models/account_transaction_models/mod.rs b/rust/processor/src/db/parquet/models/account_transaction_models/mod.rs new file mode 100644 index 000000000..32f3eeb20 --- /dev/null +++ b/rust/processor/src/db/parquet/models/account_transaction_models/mod.rs @@ -0,0 +1,4 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod parquet_account_transactions; diff --git a/rust/processor/src/db/parquet/models/account_transaction_models/parquet_account_transactions.rs b/rust/processor/src/db/parquet/models/account_transaction_models/parquet_account_transactions.rs new file mode 100644 index 000000000..59bcef138 --- /dev/null +++ b/rust/processor/src/db/parquet/models/account_transaction_models/parquet_account_transactions.rs @@ -0,0 +1,42 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, +}; +use allocative_derive::Allocative; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +pub type AccountTransactionPK = (String, i64); + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct AccountTransaction { + pub txn_version: i64, + pub account_address: String, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, +} + +impl NamedTable for AccountTransaction { + const TABLE_NAME: &'static str = "fungible_asset_activities"; +} + +impl HasVersion for AccountTransaction { + fn version(&self) -> i64 { + self.txn_version + } +} + +impl GetTimeStamp for AccountTransaction { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} diff --git a/rust/processor/src/db/parquet/models/mod.rs b/rust/processor/src/db/parquet/models/mod.rs index 78266bbf9..186fe7564 100644 --- a/rust/processor/src/db/parquet/models/mod.rs +++ b/rust/processor/src/db/parquet/models/mod.rs @@ -1,3 +1,4 @@ +pub mod account_transaction_models; pub mod default_models; pub mod event_models; pub mod fungible_asset_models; diff --git a/rust/processor/src/db/postgres/models/account_transaction_models/account_transactions.rs b/rust/processor/src/db/postgres/models/account_transaction_models/account_transactions.rs index c911df689..bce79d77c 100644 --- a/rust/processor/src/db/postgres/models/account_transaction_models/account_transactions.rs +++ b/rust/processor/src/db/postgres/models/account_transaction_models/account_transactions.rs @@ -4,17 +4,9 @@ // This is required because a diesel macro makes clippy sad #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] - use crate::{ - db::postgres::models::{ - object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource, - user_transactions_models::user_transactions::UserTransaction, - }, schema::account_transactions, - utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address}, }; -use ahash::AHashSet; -use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Change, Transaction}; use field_count::FieldCount; use serde::{Deserialize, Serialize}; @@ -27,84 +19,3 @@ pub struct AccountTransaction { pub transaction_version: i64, pub account_address: String, } - -impl AccountTransaction { - /// This table will record every transaction that touch an account which could be - /// a user account, an object, or a resource account. - /// We will consider all transactions that modify a resource or event associated with a particular account. - /// We will do 1 level of redirection for now (e.g. if it's an object, we will record the owner as account address). - /// We will also consider transactions that the account signed or is part of a multi sig / multi agent. - /// TODO: recursively find the parent account of an object - /// TODO: include table items in the detection path - pub fn get_accounts(transaction: &Transaction) -> AHashSet { - let txn_version = transaction.version as i64; - let txn_data = match transaction.txn_data.as_ref() { - Some(data) => data, - None => { - PROCESSOR_UNKNOWN_TYPE_COUNT - .with_label_values(&["AccountTransaction"]) - .inc(); - tracing::warn!( - transaction_version = transaction.version, - "Transaction data doesn't exist", - ); - return AHashSet::new(); - }, - }; - let transaction_info = transaction.info.as_ref().unwrap_or_else(|| { - panic!("Transaction info doesn't exist for version {}", txn_version) - }); - let wscs = &transaction_info.changes; - let (events, signatures) = match txn_data { - TxnData::User(inner) => ( - &inner.events, - UserTransaction::get_signatures( - inner.request.as_ref().unwrap_or_else(|| { - panic!("User request doesn't exist for version {}", txn_version) - }), - txn_version, - transaction.block_height as i64, - ), - ), - TxnData::Genesis(inner) => (&inner.events, vec![]), - TxnData::BlockMetadata(inner) => (&inner.events, vec![]), - TxnData::Validator(inner) => (&inner.events, vec![]), - _ => { - return AHashSet::new(); - }, - }; - let mut accounts = AHashSet::new(); - for sig in signatures { - accounts.insert(sig.signer); - } - for event in events { - // Record event account address. We don't really have to worry about objects here - // because it'll be taken care of in the resource section. - accounts.insert(standardize_address( - event.key.as_ref().unwrap().account_address.as_str(), - )); - } - for wsc in wscs { - match wsc.change.as_ref().unwrap() { - Change::DeleteResource(res) => { - // Record resource account. - // TODO: If the resource is an object, then we need to look for the latest - // owner. This isn't really possible right now given we have parallel threads - // so it'll be very difficult to ensure that we have the correct latest owner. - accounts.insert(standardize_address(res.address.as_str())); - }, - Change::WriteResource(res) => { - // Record resource account. If the resource is an object, then we record the - // owner as well. - // This handles partial deletes as well. - accounts.insert(standardize_address(res.address.as_str())); - if let Some(inner) = &ObjectWithMetadata::from_write_resource(res).unwrap() { - accounts.insert(inner.object_core.get_owner_address()); - } - }, - _ => {}, - } - } - accounts - } -} diff --git a/rust/processor/src/processors/account_transactions_processor.rs b/rust/processor/src/processors/account_transactions_processor.rs index 6fd7f57d9..bd41a6d9f 100644 --- a/rust/processor/src/processors/account_transactions_processor.rs +++ b/rust/processor/src/processors/account_transactions_processor.rs @@ -3,7 +3,10 @@ use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ - db::postgres::models::account_transaction_models::account_transactions::AccountTransaction, + db::{ + common::models::account_transaction_models::raw_account_transactions::RawAccountTransaction, + postgres::models::account_transaction_models::account_transactions::AccountTransaction, + }, gap_detectors::ProcessingResult, schema, utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, @@ -106,7 +109,7 @@ impl ProcessorTrait for AccountTransactionsProcessor { .into_par_iter() .map(|txn| { let transaction_version = txn.version as i64; - let accounts = AccountTransaction::get_accounts(&txn); + let accounts = RawAccountTransaction::get_accounts(&txn); accounts .into_iter() .map(|account_address| AccountTransaction { diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index a56ef0c19..d5bd8beb2 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -4,6 +4,7 @@ use super::{db_config::DbConfig, processor_config::ProcessorConfig}; use crate::{ parquet_processors::{ + parquet_account_transactions_processor::ParquetAccountTransactionsProcessor, parquet_default_processor::ParquetDefaultProcessor, parquet_events_processor::ParquetEventsProcessor, parquet_fungible_asset_processor::ParquetFungibleAssetProcessor, @@ -100,6 +101,11 @@ impl RunnableConfig for IndexerProcessorConfig { ParquetTransactionMetadataProcessor::new(self.clone()).await?; parquet_transaction_metadata_processor.run_processor().await }, + ProcessorConfig::ParquetAccountTransactionsProcessor(_) => { + let parquet_account_transactions_processor = + ParquetAccountTransactionsProcessor::new(self.clone()).await?; + parquet_account_transactions_processor.run_processor().await + }, } } diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 8ef4bb152..639d2fb4b 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -9,6 +9,7 @@ use ahash::AHashMap; use processor::{ bq_analytics::generic_parquet_processor::NamedTable, db::parquet::models::{ + account_transaction_models::parquet_account_transactions::AccountTransaction, default_models::{ parquet_block_metadata_transactions::BlockMetadataTransaction, parquet_move_modules::MoveModule, @@ -81,6 +82,7 @@ pub enum ProcessorConfig { ParquetEventsProcessor(ParquetDefaultProcessorConfig), ParquetFungibleAssetProcessor(ParquetDefaultProcessorConfig), ParquetTransactionMetadataProcessor(ParquetDefaultProcessorConfig), + ParquetAccountTransactionsProcessor(ParquetDefaultProcessorConfig), } impl ProcessorConfig { @@ -99,6 +101,7 @@ impl ProcessorConfig { ProcessorConfig::ParquetDefaultProcessor(config) | ProcessorConfig::ParquetEventsProcessor(config) | ProcessorConfig::ParquetTransactionMetadataProcessor(config) + | ProcessorConfig::ParquetAccountTransactionsProcessor(config) | ProcessorConfig::ParquetFungibleAssetProcessor(config) => { // Get the processor name as a prefix let processor_name = self.name(); @@ -151,6 +154,9 @@ impl ProcessorConfig { ProcessorName::ParquetTransactionMetadataProcessor => { HashSet::from([WriteSetSize::TABLE_NAME.to_string()]) }, + ProcessorName::ParquetAccountTransactionsProcessor => { + HashSet::from([AccountTransaction::TABLE_NAME.to_string()]) + }, _ => HashSet::new(), // Default case for unsupported processors } } diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index df0ddb65d..270f734fd 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -13,6 +13,7 @@ use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClien use parquet::schema::types::Type; use processor::{ db::parquet::models::{ + account_transaction_models::parquet_account_transactions::AccountTransaction, default_models::{ parquet_block_metadata_transactions::BlockMetadataTransaction, parquet_move_modules::MoveModule, @@ -44,6 +45,7 @@ use std::{ }; use strum::{Display, EnumIter}; +pub mod parquet_account_transactions_processor; pub mod parquet_default_processor; pub mod parquet_events_processor; pub mod parquet_fungible_asset_processor; @@ -90,6 +92,8 @@ pub enum ParquetTypeEnum { CurrentUnifiedFungibleAssetBalance, // txn metadata, WriteSetSize, + // account transactions + AccountTransactions, } /// Trait for handling various Parquet types. @@ -165,6 +169,7 @@ impl_parquet_trait!( ParquetTypeEnum::CurrentUnifiedFungibleAssetBalance ); impl_parquet_trait!(WriteSetSize, ParquetTypeEnum::WriteSetSize); +impl_parquet_trait!(AccountTransaction, ParquetTypeEnum::AccountTransactions); #[derive(Debug, Clone)] #[enum_dispatch(ParquetTypeTrait)] @@ -184,6 +189,7 @@ pub enum ParquetTypeStructs { CurrentFungibleAssetBalance(Vec), CurrentUnifiedFungibleAssetBalance(Vec), WriteSetSize(Vec), + AccountTransaction(Vec), } impl ParquetTypeStructs { @@ -216,6 +222,9 @@ impl ParquetTypeStructs { ParquetTypeStructs::CurrentUnifiedFungibleAssetBalance(Vec::new()) }, ParquetTypeEnum::WriteSetSize => ParquetTypeStructs::WriteSetSize(Vec::new()), + ParquetTypeEnum::AccountTransactions => { + ParquetTypeStructs::AccountTransaction(Vec::new()) + }, } } @@ -316,6 +325,12 @@ impl ParquetTypeStructs { ) => { handle_append!(self_data, other_data) }, + ( + ParquetTypeStructs::AccountTransaction(self_data), + ParquetTypeStructs::AccountTransaction(other_data), + ) => { + handle_append!(self_data, other_data) + }, _ => Err(ProcessorError::ProcessError { message: "Mismatched buffer types in append operation".to_string(), }), diff --git a/rust/sdk-processor/src/parquet_processors/parquet_account_transactions_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_account_transactions_processor.rs new file mode 100644 index 000000000..4ae35ea16 --- /dev/null +++ b/rust/sdk-processor/src/parquet_processors/parquet_account_transactions_processor.rs @@ -0,0 +1,181 @@ +use crate::{ + config::{ + db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + processor_config::ProcessorConfig, + }, + parquet_processors::{ + initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step, + set_backfill_table_flag, ParquetTypeEnum, + }, + steps::{ + common::{ + parquet_version_tracker_step::ParquetVersionTrackerStep, + processor_status_saver::get_processor_status_saver, + }, + parquet_account_transactions_processor::parquet_account_transactions_extractor::ParquetAccountTransactionsExtractor, + }, + utils::{ + chain_id::check_or_update_chain_id, + database::{run_migrations, ArcDbPool}, + starting_version::get_min_last_success_version_parquet, + }, +}; +use anyhow::Context; +use aptos_indexer_processor_sdk::{ + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::{TransactionStreamStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS}, + traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, +}; +use parquet::schema::types::Type; +use processor::{ + bq_analytics::generic_parquet_processor::HasParquetSchema, + db::parquet::models::account_transaction_models::parquet_account_transactions::AccountTransaction, +}; +use std::{collections::HashMap, sync::Arc}; +use tracing::{debug, info}; + +pub struct ParquetAccountTransactionsProcessor { + pub config: IndexerProcessorConfig, + pub db_pool: ArcDbPool, +} + +impl ParquetAccountTransactionsProcessor { + pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result { + let db_pool = initialize_database_pool(&config.db_config).await?; + Ok(Self { config, db_pool }) + } +} + +#[async_trait::async_trait] +impl ProcessorTrait for ParquetAccountTransactionsProcessor { + fn name(&self) -> &'static str { + self.config.processor_config.name() + } + + async fn run_processor(&self) -> anyhow::Result<()> { + // Run Migrations + let parquet_db_config = match self.config.db_config { + DbConfig::ParquetConfig(ref parquet_config) => { + run_migrations( + parquet_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; + parquet_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid db config for ParquetAccountTransactionsProcessor {:?}", + self.config.db_config + )); + }, + }; + + // Check and update the ledger chain id to ensure we're indexing the correct chain + let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) + .await? + .get_chain_id() + .await?; + check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; + + let parquet_processor_config = match self.config.processor_config.clone() { + ProcessorConfig::ParquetAccountTransactionsProcessor(parquet_processor_config) => { + parquet_processor_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid processor configuration for ParquetAccountTransactionsProcessor {:?}", + self.config.processor_config + )); + }, + }; + + let processor_status_table_names = self + .config + .processor_config + .get_processor_status_table_names() + .context("Failed to get table names for the processor status table")?; + + let starting_version = get_min_last_success_version_parquet( + &self.config, + self.db_pool.clone(), + processor_status_table_names, + ) + .await?; + println!("Starting version: {:?}", starting_version); + + // Define processor transaction stream config + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + + let backfill_table = set_backfill_table_flag(parquet_processor_config.backfill_table); + let parquet_account_transactions_extractor = ParquetAccountTransactionsExtractor { + opt_in_tables: backfill_table, + }; + + let gcs_client = + initialize_gcs_client(parquet_db_config.google_application_credentials.clone()).await; + + let parquet_type_to_schemas: HashMap> = [( + ParquetTypeEnum::AccountTransactions, + AccountTransaction::schema(), + )] + .into_iter() + .collect(); + + let default_size_buffer_step = initialize_parquet_buffer_step( + gcs_client.clone(), + parquet_type_to_schemas, + parquet_processor_config.upload_interval, + parquet_processor_config.max_buffer_size, + parquet_db_config.bucket_name.clone(), + parquet_db_config.bucket_root.clone(), + self.name().to_string(), + ) + .await + .unwrap_or_else(|e| { + panic!("Failed to initialize parquet buffer step: {:?}", e); + }); + + let parquet_version_tracker_step = ParquetVersionTrackerStep::new( + get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + ); + + let channel_size = parquet_processor_config.channel_size; + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to( + parquet_account_transactions_extractor.into_runnable_step(), + channel_size, + ) + .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .connect_to( + parquet_version_tracker_step.into_runnable_step(), + channel_size, + ) + .end_and_return_output_receiver(channel_size); + + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} diff --git a/rust/sdk-processor/src/steps/account_transactions_processor/account_transactions_extractor.rs b/rust/sdk-processor/src/steps/account_transactions_processor/account_transactions_extractor.rs index 926e523d3..8e95ebcce 100644 --- a/rust/sdk-processor/src/steps/account_transactions_processor/account_transactions_extractor.rs +++ b/rust/sdk-processor/src/steps/account_transactions_processor/account_transactions_extractor.rs @@ -5,7 +5,10 @@ use aptos_indexer_processor_sdk::{ utils::errors::ProcessorError, }; use async_trait::async_trait; -use processor::db::postgres::models::account_transaction_models::account_transactions::AccountTransaction; +use processor::db::{ + common::models::account_transaction_models::raw_account_transactions::RawAccountTransaction, + postgres::models::account_transaction_models::account_transactions::AccountTransaction, +}; use rayon::prelude::*; pub struct AccountTransactionsExtractor @@ -27,7 +30,7 @@ impl Processable for AccountTransactionsExtractor { .into_par_iter() .map(|txn| { let transaction_version = txn.version as i64; - let accounts = AccountTransaction::get_accounts(&txn); + let accounts = RawAccountTransaction::get_accounts(&txn); accounts .into_iter() .map(|account_address| AccountTransaction { diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index 61dc1c397..77e1b4f62 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -9,6 +9,7 @@ pub mod stake_processor; pub mod token_v2_processor; pub mod user_transaction_processor; // parquet +pub mod parquet_account_transactions_processor; pub mod parquet_default_processor; pub mod parquet_events_processor; pub mod parquet_fungible_asset_processor; diff --git a/rust/sdk-processor/src/steps/parquet_account_transactions_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_account_transactions_processor/mod.rs new file mode 100644 index 000000000..6046d1ef1 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_account_transactions_processor/mod.rs @@ -0,0 +1 @@ +pub mod parquet_account_transactions_extractor; diff --git a/rust/sdk-processor/src/steps/parquet_account_transactions_processor/parquet_account_transactions_extractor.rs b/rust/sdk-processor/src/steps/parquet_account_transactions_processor/parquet_account_transactions_extractor.rs new file mode 100644 index 000000000..4b0cf3301 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_account_transactions_processor/parquet_account_transactions_extractor.rs @@ -0,0 +1,99 @@ +use crate::{ + parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, + utils::parquet_extractor_helper::add_to_map_if_opted_in_for_backfill, +}; +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use chrono::NaiveDateTime; +use processor::{ + db::{ + common::models::account_transaction_models::raw_account_transactions::RawAccountTransaction, + parquet::models::account_transaction_models::parquet_account_transactions::AccountTransaction, + }, + utils::table_flags::TableFlags, +}; +use rayon::prelude::*; +use std::collections::HashMap; +use tracing::debug; +pub struct ParquetAccountTransactionsExtractor +where + Self: Processable + Send + Sized + 'static, +{ + pub opt_in_tables: TableFlags, +} + +type ParquetTypeMap = HashMap; + +#[async_trait] +impl Processable for ParquetAccountTransactionsExtractor { + type Input = Vec; + type Output = ParquetTypeMap; + type RunType = AsyncRunType; + + async fn process( + &mut self, + transactions: TransactionContext, + ) -> anyhow::Result>, ProcessorError> { + let acc_txns: Vec = transactions + .data + .into_par_iter() + .map(|txn| { + let transaction_version = txn.version as i64; + let txn_timestamp = txn + .timestamp + .as_ref() + .expect("Transaction timestamp doesn't exist!") + .seconds; + #[allow(deprecated)] + let block_timestamp = NaiveDateTime::from_timestamp_opt(txn_timestamp, 0) + .expect("Txn Timestamp is invalid!"); + + let accounts = RawAccountTransaction::get_accounts(&txn); + accounts + .into_iter() + .map(|account_address| AccountTransaction { + txn_version: transaction_version, + account_address, + block_timestamp, + }) + .collect() + }) + .collect::>>() + .into_iter() + .flatten() + .collect(); + // Print the size of each extracted data type + debug!("Processed data sizes:"); + debug!(" - AccountTransaction: {}", acc_txns.len()); + + let mut map: HashMap = HashMap::new(); + + // Array of tuples for each data type and its corresponding enum variant and flag + let data_types = [( + TableFlags::ACCOUNT_TRANSACTIONS, + ParquetTypeEnum::AccountTransactions, + ParquetTypeStructs::AccountTransaction(acc_txns), + )]; + + // Populate the map based on opt-in tables + add_to_map_if_opted_in_for_backfill(self.opt_in_tables, &mut map, data_types.to_vec()); + + Ok(Some(TransactionContext { + data: map, + metadata: transactions.metadata, + })) + } +} + +impl AsyncStep for ParquetAccountTransactionsExtractor {} + +impl NamedStep for ParquetAccountTransactionsExtractor { + fn name(&self) -> String { + "ParquetAccountTransactionsExtractor".to_string() + } +}