diff --git a/rust/processor/src/db/parquet/models/mod.rs b/rust/processor/src/db/parquet/models/mod.rs index 898d40234..fa86884df 100644 --- a/rust/processor/src/db/parquet/models/mod.rs +++ b/rust/processor/src/db/parquet/models/mod.rs @@ -2,3 +2,4 @@ pub mod ans_models; pub mod default_models; pub mod event_models; pub mod fungible_asset_models; +pub mod transaction_metadata_model; diff --git a/rust/processor/src/db/parquet/models/transaction_metadata_model/mod.rs b/rust/processor/src/db/parquet/models/transaction_metadata_model/mod.rs new file mode 100644 index 000000000..938cb5673 --- /dev/null +++ b/rust/processor/src/db/parquet/models/transaction_metadata_model/mod.rs @@ -0,0 +1,4 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod parquet_write_set_size_info; diff --git a/rust/processor/src/db/postgres/models/transaction_metadata_model/parquet_write_set_size_info.rs b/rust/processor/src/db/parquet/models/transaction_metadata_model/parquet_write_set_size_info.rs similarity index 100% rename from rust/processor/src/db/postgres/models/transaction_metadata_model/parquet_write_set_size_info.rs rename to rust/processor/src/db/parquet/models/transaction_metadata_model/parquet_write_set_size_info.rs diff --git a/rust/processor/src/db/postgres/models/transaction_metadata_model/mod.rs b/rust/processor/src/db/postgres/models/transaction_metadata_model/mod.rs index f8c57db60..a04e5184d 100644 --- a/rust/processor/src/db/postgres/models/transaction_metadata_model/mod.rs +++ b/rust/processor/src/db/postgres/models/transaction_metadata_model/mod.rs @@ -4,6 +4,3 @@ pub mod event_size_info; pub mod transaction_size_info; pub mod write_set_size_info; - -// parquet models -pub mod parquet_write_set_size_info; diff --git a/rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs index a513b6a76..74617652f 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs @@ -6,7 +6,7 @@ use crate::{ create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric, ParquetProcessingResult, }, - db::postgres::models::transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, + db::parquet::models::transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, gap_detectors::ProcessingResult, processors::{parquet_processors::ParquetProcessorTrait, ProcessorName, ProcessorTrait}, utils::{database::ArcDbPool, util::parse_timestamp}, @@ -92,31 +92,8 @@ impl ProcessorTrait for ParquetTransactionMetadataProcessor { let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); let mut transaction_version_to_struct_count: AHashMap = AHashMap::new(); - let mut write_set_sizes = vec![]; - - for txn in &transactions { - let txn_version = txn.version as i64; - let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version); - let size_info = match txn.size_info.as_ref() { - Some(size_info) => size_info, - None => { - warn!(version = txn.version, "Transaction size info not found"); - continue; - }, - }; - for (index, write_set_size_info) in size_info.write_op_size_info.iter().enumerate() { - write_set_sizes.push(WriteSetSize::from_transaction_info( - write_set_size_info, - txn_version, - index as i64, - block_timestamp, - )); - transaction_version_to_struct_count - .entry(txn_version) - .and_modify(|e| *e += 1) - .or_insert(1); - } - } + let write_set_sizes = + process_transaction(transactions, &mut transaction_version_to_struct_count); let write_set_size_info_parquet_data = ParquetDataGeneric { data: write_set_sizes, @@ -143,3 +120,35 @@ impl ProcessorTrait for ParquetTransactionMetadataProcessor { &self.connection_pool } } + +pub fn process_transaction( + transactions: Vec, + transaction_version_to_struct_count: &mut AHashMap, +) -> Vec { + let mut write_set_sizes = vec![]; + + for txn in transactions { + let txn_version = txn.version as i64; + let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version); + let size_info = match txn.size_info.as_ref() { + Some(size_info) => size_info, + None => { + warn!(version = txn.version, "Transaction size info not found"); + continue; + }, + }; + for (index, write_set_size_info) in size_info.write_op_size_info.iter().enumerate() { + write_set_sizes.push(WriteSetSize::from_transaction_info( + write_set_size_info, + txn_version, + index as i64, + block_timestamp, + )); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1) + .or_insert(1); + } + } + write_set_sizes +} diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index f17f6c21e..a56ef0c19 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -7,6 +7,7 @@ use crate::{ parquet_default_processor::ParquetDefaultProcessor, parquet_events_processor::ParquetEventsProcessor, parquet_fungible_asset_processor::ParquetFungibleAssetProcessor, + parquet_transaction_metadata_processor::ParquetTransactionMetadataProcessor, }, processors::{ account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor, @@ -94,6 +95,11 @@ impl RunnableConfig for IndexerProcessorConfig { ParquetFungibleAssetProcessor::new(self.clone()).await?; parquet_fungible_asset_processor.run_processor().await }, + ProcessorConfig::ParquetTransactionMetadataProcessor(_) => { + let parquet_transaction_metadata_processor = + ParquetTransactionMetadataProcessor::new(self.clone()).await?; + parquet_transaction_metadata_processor.run_processor().await + }, } } diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index aeffb5979..8ef4bb152 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -26,6 +26,7 @@ use processor::{ }, parquet_v2_fungible_metadata::FungibleAssetMetadataModel, }, + transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, }, }; use serde::{Deserialize, Serialize}; @@ -79,6 +80,7 @@ pub enum ProcessorConfig { ParquetDefaultProcessor(ParquetDefaultProcessorConfig), ParquetEventsProcessor(ParquetDefaultProcessorConfig), ParquetFungibleAssetProcessor(ParquetDefaultProcessorConfig), + ParquetTransactionMetadataProcessor(ParquetDefaultProcessorConfig), } impl ProcessorConfig { @@ -96,6 +98,7 @@ impl ProcessorConfig { match self { ProcessorConfig::ParquetDefaultProcessor(config) | ProcessorConfig::ParquetEventsProcessor(config) + | ProcessorConfig::ParquetTransactionMetadataProcessor(config) | ProcessorConfig::ParquetFungibleAssetProcessor(config) => { // Get the processor name as a prefix let processor_name = self.name(); @@ -145,6 +148,9 @@ impl ProcessorConfig { CurrentUnifiedFungibleAssetBalance::TABLE_NAME.to_string(), FungibleAssetMetadataModel::TABLE_NAME.to_string(), ]), + ProcessorName::ParquetTransactionMetadataProcessor => { + HashSet::from([WriteSetSize::TABLE_NAME.to_string()]) + }, _ => HashSet::new(), // Default case for unsupported processors } } diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 0d71fbf2f..dace04bdf 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -32,6 +32,7 @@ use processor::{ }, parquet_v2_fungible_metadata::FungibleAssetMetadataModel, }, + transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, }, postgres::models::ans_models::parquet_ans_lookup_v2::AnsPrimaryNameV2, }, @@ -50,6 +51,7 @@ pub mod parquet_ans_processor; pub mod parquet_default_processor; pub mod parquet_events_processor; pub mod parquet_fungible_asset_processor; +pub mod parquet_transaction_metadata_processor; const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; @@ -92,6 +94,8 @@ pub enum ParquetTypeEnum { FungibleAssetBalances, CurrentFungibleAssetBalances, CurrentUnifiedFungibleAssetBalances, + // txn metadata, + WriteSetSize, } /// Trait for handling various Parquet types. @@ -167,6 +171,7 @@ impl_parquet_trait!( CurrentUnifiedFungibleAssetBalance, ParquetTypeEnum::CurrentUnifiedFungibleAssetBalances ); +impl_parquet_trait!(WriteSetSize, ParquetTypeEnum::WriteSetSize); #[derive(Debug, Clone)] #[enum_dispatch(ParquetTypeTrait)] @@ -186,6 +191,7 @@ pub enum ParquetTypeStructs { FungibleAssetBalance(Vec), CurrentFungibleAssetBalance(Vec), CurrentUnifiedFungibleAssetBalance(Vec), + WriteSetSize(Vec), } impl ParquetTypeStructs { @@ -218,6 +224,7 @@ impl ParquetTypeStructs { ParquetTypeEnum::CurrentUnifiedFungibleAssetBalances => { ParquetTypeStructs::CurrentUnifiedFungibleAssetBalance(Vec::new()) }, + ParquetTypeEnum::WriteSetSize => ParquetTypeStructs::WriteSetSize(Vec::new()), } } @@ -318,6 +325,12 @@ impl ParquetTypeStructs { ) => { handle_append!(self_data, other_data) }, + ( + ParquetTypeStructs::WriteSetSize(self_data), + ParquetTypeStructs::WriteSetSize(other_data), + ) => { + handle_append!(self_data, other_data) + }, _ => Err(ProcessorError::ProcessError { message: "Mismatched buffer types in append operation".to_string(), }), diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index 9a4ea5f6c..ea64125b6 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -111,7 +111,6 @@ impl ProcessorTrait for ParquetDefaultProcessor { processor_status_table_names, ) .await?; - println!("Starting version: {:?}", starting_version); // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { diff --git a/rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs new file mode 100644 index 000000000..8587593cb --- /dev/null +++ b/rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs @@ -0,0 +1,178 @@ +use crate::{ + config::{ + db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + processor_config::ProcessorConfig, + }, + parquet_processors::{ + initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step, + set_backfill_table_flag, ParquetTypeEnum, + }, + steps::{ + common::{ + parquet_version_tracker_step::ParquetVersionTrackerStep, + processor_status_saver::get_processor_status_saver, + }, + parquet_transaction_metadata_processor::parquet_transaction_metadata_extractor::ParquetTransactionMetadataExtractor, + }, + utils::{ + chain_id::check_or_update_chain_id, + database::{run_migrations, ArcDbPool}, + starting_version::get_min_last_success_version_parquet, + }, +}; +use anyhow::Context; +use aptos_indexer_processor_sdk::{ + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::{TransactionStreamStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS}, + traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, +}; +use parquet::schema::types::Type; +use processor::{ + bq_analytics::generic_parquet_processor::HasParquetSchema, + db::parquet::models::transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, +}; +use std::{collections::HashMap, sync::Arc}; +use tracing::{debug, info}; + +pub struct ParquetTransactionMetadataProcessor { + pub config: IndexerProcessorConfig, + pub db_pool: ArcDbPool, +} + +impl ParquetTransactionMetadataProcessor { + pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result { + let db_pool = initialize_database_pool(&config.db_config).await?; + Ok(Self { config, db_pool }) + } +} + +#[async_trait::async_trait] +impl ProcessorTrait for ParquetTransactionMetadataProcessor { + fn name(&self) -> &'static str { + self.config.processor_config.name() + } + + async fn run_processor(&self) -> anyhow::Result<()> { + // Run Migrations + let parquet_db_config = match self.config.db_config { + DbConfig::ParquetConfig(ref parquet_config) => { + run_migrations( + parquet_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; + parquet_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid db config for ParquetTransactionMetadataProcessor {:?}", + self.config.db_config + )); + }, + }; + + // Check and update the ledger chain id to ensure we're indexing the correct chain + let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) + .await? + .get_chain_id() + .await?; + check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; + + let parquet_processor_config = match self.config.processor_config.clone() { + ProcessorConfig::ParquetTransactionMetadataProcessor(parquet_processor_config) => { + parquet_processor_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid processor configuration for ParquetTransactionMetadataProcessor {:?}", + self.config.processor_config + )); + }, + }; + + let processor_status_table_names = self + .config + .processor_config + .get_processor_status_table_names() + .context("Failed to get table names for the processor status table")?; + + let starting_version = get_min_last_success_version_parquet( + &self.config, + self.db_pool.clone(), + processor_status_table_names, + ) + .await?; + + // Define processor transaction stream config + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + + let backfill_table = set_backfill_table_flag(parquet_processor_config.backfill_table); + let parquet_txn_metadata_extractor = ParquetTransactionMetadataExtractor { + opt_in_tables: backfill_table, + }; + + let gcs_client = + initialize_gcs_client(parquet_db_config.google_application_credentials.clone()).await; + + let parquet_type_to_schemas: HashMap> = + [(ParquetTypeEnum::WriteSetSize, WriteSetSize::schema())] + .into_iter() + .collect(); + + let default_size_buffer_step = initialize_parquet_buffer_step( + gcs_client.clone(), + parquet_type_to_schemas, + parquet_processor_config.upload_interval, + parquet_processor_config.max_buffer_size, + parquet_db_config.bucket_name.clone(), + parquet_db_config.bucket_root.clone(), + self.name().to_string(), + ) + .await + .unwrap_or_else(|e| { + panic!("Failed to initialize parquet buffer step: {:?}", e); + }); + + let parquet_version_tracker_step = ParquetVersionTrackerStep::new( + get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + ); + + let channel_size = parquet_processor_config.channel_size; + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to( + parquet_txn_metadata_extractor.into_runnable_step(), + channel_size, + ) + .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .connect_to( + parquet_version_tracker_step.into_runnable_step(), + channel_size, + ) + .end_and_return_output_receiver(channel_size); + + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index 747d5902d..61dc1c397 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -12,5 +12,6 @@ pub mod user_transaction_processor; pub mod parquet_default_processor; pub mod parquet_events_processor; pub mod parquet_fungible_asset_processor; +pub mod parquet_transaction_metadata_processor; pub const MIN_TRANSACTIONS_PER_RAYON_JOB: usize = 64; diff --git a/rust/sdk-processor/src/steps/parquet_events_processor/parquet_events_extractor.rs b/rust/sdk-processor/src/steps/parquet_events_processor/parquet_events_extractor.rs index d12e4fe23..852fb3b05 100644 --- a/rust/sdk-processor/src/steps/parquet_events_processor/parquet_events_extractor.rs +++ b/rust/sdk-processor/src/steps/parquet_events_processor/parquet_events_extractor.rs @@ -39,7 +39,6 @@ impl Processable for ParquetEventsExtractor { let mut map: HashMap = HashMap::new(); - // Array of tuples for each data type and its corresponding enum variant and flag let data_types = [( TableFlags::EVENTS, ParquetTypeEnum::Events, diff --git a/rust/sdk-processor/src/steps/parquet_fungible_asset_processor/parquet_fa_extractor.rs b/rust/sdk-processor/src/steps/parquet_fungible_asset_processor/parquet_fa_extractor.rs index 720cedc7c..7c44a1950 100644 --- a/rust/sdk-processor/src/steps/parquet_fungible_asset_processor/parquet_fa_extractor.rs +++ b/rust/sdk-processor/src/steps/parquet_fungible_asset_processor/parquet_fa_extractor.rs @@ -119,7 +119,6 @@ impl Processable for ParquetFungibleAssetExtractor { let mut map: HashMap = HashMap::new(); - // Array of tuples for each data type and its corresponding enum variant and flag let data_types = [ ( TableFlags::FUNGIBLE_ASSET_ACTIVITIES, diff --git a/rust/sdk-processor/src/steps/parquet_transaction_metadata_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_transaction_metadata_processor/mod.rs new file mode 100644 index 000000000..af82ef3bf --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_transaction_metadata_processor/mod.rs @@ -0,0 +1 @@ +pub mod parquet_transaction_metadata_extractor; diff --git a/rust/sdk-processor/src/steps/parquet_transaction_metadata_processor/parquet_transaction_metadata_extractor.rs b/rust/sdk-processor/src/steps/parquet_transaction_metadata_processor/parquet_transaction_metadata_extractor.rs new file mode 100644 index 000000000..07e720dee --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_transaction_metadata_processor/parquet_transaction_metadata_extractor.rs @@ -0,0 +1,71 @@ +use crate::{ + parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, + utils::parquet_extractor_helper::add_to_map_if_opted_in_for_backfill, +}; +use ahash::AHashMap; +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::{ + processors::parquet_processors::parquet_transaction_metadata_processor::process_transaction, + utils::table_flags::TableFlags, +}; +use std::collections::HashMap; +use tracing::debug; + +/// Extracts parquet data from transactions, allowing optional selection of specific tables. +pub struct ParquetTransactionMetadataExtractor +where + Self: Processable + Send + Sized + 'static, +{ + pub opt_in_tables: TableFlags, +} + +type ParquetTypeMap = HashMap; + +#[async_trait] +impl Processable for ParquetTransactionMetadataExtractor { + type Input = Vec; + type Output = ParquetTypeMap; + type RunType = AsyncRunType; + + async fn process( + &mut self, + transactions: TransactionContext, + ) -> anyhow::Result>, ProcessorError> { + let mut transaction_version_to_struct_count: AHashMap = AHashMap::new(); + let write_set_size = + process_transaction(transactions.data, &mut transaction_version_to_struct_count); + + debug!("Processed data sizes:"); + debug!(" - WriteSetSize: {}", write_set_size.len()); + + let mut map: HashMap = HashMap::new(); + + let data_types = [( + TableFlags::WRITE_SET_SIZE, + ParquetTypeEnum::WriteSetSize, + ParquetTypeStructs::WriteSetSize(write_set_size), + )]; + + // Populate the map based on opt-in tables + add_to_map_if_opted_in_for_backfill(self.opt_in_tables, &mut map, data_types.to_vec()); + + Ok(Some(TransactionContext { + data: map, + metadata: transactions.metadata, + })) + } +} + +impl AsyncStep for ParquetTransactionMetadataExtractor {} + +impl NamedStep for ParquetTransactionMetadataExtractor { + fn name(&self) -> String { + "ParquetTransactionMetadataExtractor".to_string() + } +}