diff --git a/rust/processor/src/db/common/models/default_models/parquet_transactions.rs b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs index 5a6adea59..960ba682b 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_transactions.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs @@ -377,6 +377,7 @@ impl Transaction { let (txn, block_metadata, mut wsc_list, mut wsc_detail_list) = Self::from_transaction(txn); txns.push(txn.clone()); + // TODO: Remove once fully migrated transaction_version_to_struct_count .entry(txn.txn_version) .and_modify(|e| *e += 1) @@ -386,6 +387,7 @@ impl Transaction { block_metadata_txns.push(a.clone()); } + // TODO: Remove once fully migrated if !wsc_list.is_empty() { transaction_version_to_struct_count .entry(txn.txn_version) diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 9c89dda25..5fce74e70 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -68,6 +68,55 @@ impl ParquetTypeStructs { ParquetTypeEnum::MoveModule => ParquetTypeStructs::MoveModule(Vec::new()), } } + + pub fn get_table_name(&self) -> &'static str { + match self { + ParquetTypeStructs::MoveResource(_) => "move_resources", + ParquetTypeStructs::WriteSetChange(_) => "write_set_changes", + ParquetTypeStructs::Transaction(_) => "parquet_transactions", + ParquetTypeStructs::TableItem(_) => "table_items", + ParquetTypeStructs::MoveModule(_) => "move_modules", + } + } +} + +pub trait ParquetStruct {} + +impl ParquetStruct for MoveResource {} +impl ParquetStruct for WriteSetChangeModel {} +impl ParquetStruct for ParquetTransaction {} +impl ParquetStruct for TableItem {} +impl ParquetStruct for MoveModule {} + +impl ParquetTypeStructs { + pub fn get_type(&self) -> ParquetTypeEnum { + match self { + ParquetTypeStructs::MoveResource(_) => ParquetTypeEnum::MoveResource, + ParquetTypeStructs::WriteSetChange(_) => ParquetTypeEnum::WriteSetChange, + ParquetTypeStructs::Transaction(_) => ParquetTypeEnum::Transaction, + ParquetTypeStructs::TableItem(_) => ParquetTypeEnum::TableItem, + ParquetTypeStructs::MoveModule(_) => ParquetTypeEnum::MoveModule, + } + } + + /// Get a vector of trait object references to the inner structs + pub fn get_structs(&self) -> Vec<&dyn ParquetStruct> { + match self { + ParquetTypeStructs::MoveResource(v) => { + v.iter().map(|s| s as &dyn ParquetStruct).collect() + }, + ParquetTypeStructs::WriteSetChange(v) => { + v.iter().map(|s| s as &dyn ParquetStruct).collect() + }, + ParquetTypeStructs::Transaction(v) => { + v.iter().map(|s| s as &dyn ParquetStruct).collect() + }, + ParquetTypeStructs::TableItem(v) => v.iter().map(|s| s as &dyn ParquetStruct).collect(), + ParquetTypeStructs::MoveModule(v) => { + v.iter().map(|s| s as &dyn ParquetStruct).collect() + }, + } + } } #[cfg(test)] diff --git a/rust/sdk-processor/src/steps/common/mod.rs b/rust/sdk-processor/src/steps/common/mod.rs index 6b2e05793..0e291f4b2 100644 --- a/rust/sdk-processor/src/steps/common/mod.rs +++ b/rust/sdk-processor/src/steps/common/mod.rs @@ -1,3 +1,4 @@ +pub mod parquet_extractor_helper; pub mod processor_status_saver; pub use processor_status_saver::get_processor_status_saver; diff --git a/rust/sdk-processor/src/steps/common/parquet_extractor_helper.rs b/rust/sdk-processor/src/steps/common/parquet_extractor_helper.rs new file mode 100644 index 000000000..cc7e31b41 --- /dev/null +++ b/rust/sdk-processor/src/steps/common/parquet_extractor_helper.rs @@ -0,0 +1,20 @@ +use crate::parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}; +use std::collections::HashMap; + +/// Fill the map with data if the table is opted in for backfill-purpose +pub fn add_to_map_if_opted_in_for_backfill( + opt_in_tables: &Option>, + map: &mut HashMap, + enum_type: ParquetTypeEnum, + data: ParquetTypeStructs, +) { + if let Some(ref backfill_table) = opt_in_tables { + let table_name = enum_type.to_string(); + if backfill_table.contains(&table_name) { + map.insert(enum_type, data); + } + } else { + // If there's no opt-in table, include all data + map.insert(enum_type, data); + } +} diff --git a/rust/sdk-processor/src/steps/default_processor/default_extractor.rs b/rust/sdk-processor/src/steps/default_processor/default_extractor.rs index b4cd700db..e7428c839 100644 --- a/rust/sdk-processor/src/steps/default_processor/default_extractor.rs +++ b/rust/sdk-processor/src/steps/default_processor/default_extractor.rs @@ -13,7 +13,6 @@ use processor::{ processors::default_processor::process_transactions, worker::TableFlags, }; -pub const MIN_TRANSACTIONS_PER_RAYON_JOB: usize = 64; pub struct DefaultExtractor where diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index 1714253e4..3dd4c9776 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -5,7 +5,9 @@ pub mod default_processor; pub mod events_processor; pub mod fungible_asset_processor; pub mod objects_processor; +mod parquet_default_processor; pub mod stake_processor; pub mod token_v2_processor; pub mod user_transaction_processor; + pub const MIN_TRANSACTIONS_PER_RAYON_JOB: usize = 64; diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs new file mode 100644 index 000000000..35ba1ba68 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs @@ -0,0 +1 @@ +pub mod parquet_default_extractor; diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs b/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs new file mode 100644 index 000000000..93dc7ac49 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs @@ -0,0 +1,146 @@ +use crate::{ + parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, + steps::common::parquet_extractor_helper::add_to_map_if_opted_in_for_backfill, +}; +use ahash::AHashMap; +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::db::common::models::default_models::{ + parquet_move_modules::MoveModule, + parquet_move_resources::MoveResource, + parquet_move_tables::TableItem, + parquet_transactions::TransactionModel, + parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, +}; +use std::collections::HashMap; +use tracing::debug; + +/// Extracts parquet data from transactions, allowing optional selection of specific tables. +pub struct ParquetDefaultExtractor +where + Self: Processable + Send + Sized + 'static, +{ + pub opt_in_tables: Option>, +} + +type ParquetTypeMap = HashMap; + +#[async_trait] +impl Processable for ParquetDefaultExtractor { + type Input = Vec; + type Output = ParquetTypeMap; + type RunType = AsyncRunType; + + async fn process( + &mut self, + transactions: TransactionContext, + ) -> anyhow::Result>, ProcessorError> { + let (move_resources, write_set_changes, parquet_transactions, table_items, move_modules) = + process_transactions(transactions.data); + + // Print the size of each extracted data type + debug!("Processed data sizes:"); + debug!(" - MoveResources: {}", move_resources.len()); + debug!(" - WriteSetChanges: {}", write_set_changes.len()); + debug!(" - ParquetTransactions: {}", parquet_transactions.len()); + debug!(" - TableItems: {}", table_items.len()); + debug!(" - MoveModules: {}", move_modules.len()); + + let mut map: HashMap = HashMap::new(); + + // Array of tuples for each data type and its corresponding enum variant + let data_types = [ + ( + ParquetTypeEnum::MoveResource, + ParquetTypeStructs::MoveResource(move_resources), + ), + ( + ParquetTypeEnum::WriteSetChange, + ParquetTypeStructs::WriteSetChange(write_set_changes), + ), + ( + ParquetTypeEnum::Transaction, + ParquetTypeStructs::Transaction(parquet_transactions), + ), + ( + ParquetTypeEnum::TableItem, + ParquetTypeStructs::TableItem(table_items), + ), + ( + ParquetTypeEnum::MoveModule, + ParquetTypeStructs::MoveModule(move_modules), + ), + ]; + + // Populate the map based on opt-in tables + for (enum_type, data) in data_types { + add_to_map_if_opted_in_for_backfill(&self.opt_in_tables, &mut map, enum_type, data); + } + + debug!( + "Map populated with data for the following tables: {:?}", + map.keys().collect::>() + ); + + Ok(Some(TransactionContext { + data: map, + metadata: transactions.metadata, + })) + } +} + +pub fn process_transactions( + transactions: Vec, +) -> ( + Vec, + Vec, + Vec, + Vec, + Vec, +) { + // this will be removed in the future. + let mut transaction_version_to_struct_count = AHashMap::new(); + let (txns, _, write_set_changes, wsc_details) = TransactionModel::from_transactions( + &transactions, + &mut transaction_version_to_struct_count, + ); + + let mut move_modules = vec![]; + let mut move_resources = vec![]; + let mut table_items = vec![]; + + for detail in wsc_details { + match detail { + WriteSetChangeDetail::Module(module) => { + move_modules.push(module); + }, + WriteSetChangeDetail::Resource(resource) => { + move_resources.push(resource); + }, + WriteSetChangeDetail::Table(item, _, _) => { + table_items.push(item); + }, + } + } + + ( + move_resources, + write_set_changes, + txns, + table_items, + move_modules, + ) +} + +impl AsyncStep for ParquetDefaultExtractor {} + +impl NamedStep for ParquetDefaultExtractor { + fn name(&self) -> String { + "ParquetDefaultExtractor".to_string() + } +}