diff --git a/rust/processor/src/processors/user_transaction_processor.rs b/rust/processor/src/processors/user_transaction_processor.rs index 52437e852..86544aadc 100644 --- a/rust/processor/src/processors/user_transaction_processor.rs +++ b/rust/processor/src/processors/user_transaction_processor.rs @@ -96,7 +96,7 @@ async fn insert_to_db( Ok(()) } -fn insert_user_transactions_query( +pub fn insert_user_transactions_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -116,7 +116,7 @@ fn insert_user_transactions_query( ) } -fn insert_signatures_query( +pub fn insert_signatures_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -153,40 +153,8 @@ impl ProcessorTrait for UserTransactionProcessor { let processing_start = std::time::Instant::now(); let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); - let mut signatures = vec![]; - let mut user_transactions = vec![]; - for txn in &transactions { - let txn_version = txn.version as i64; - let block_height = txn.block_height as i64; - let txn_data = match txn.txn_data.as_ref() { - Some(txn_data) => txn_data, - None => { - PROCESSOR_UNKNOWN_TYPE_COUNT - .with_label_values(&["UserTransactionProcessor"]) - .inc(); - tracing::warn!( - transaction_version = txn_version, - "Transaction data doesn't exist" - ); - continue; - }, - }; - if let TxnData::User(inner) = txn_data { - let (user_transaction, sigs) = UserTransactionModel::from_transaction( - inner, - txn.timestamp.as_ref().unwrap(), - block_height, - txn.epoch as i64, - txn_version, - ); - signatures.extend(sigs); - user_transactions.push(user_transaction); - } - } - - if self.deprecated_tables.contains(TableFlags::SIGNATURES) { - signatures.clear(); - } + let (user_transactions, signatures) = + user_transaction_parse(transactions, self.deprecated_tables); let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -229,3 +197,46 @@ impl ProcessorTrait for UserTransactionProcessor { &self.connection_pool } } + +/// Helper function to parse user transactions and signatures from the transaction data. +pub fn user_transaction_parse( + transactions: Vec, + deprecated_tables: TableFlags, +) -> (Vec, Vec) { + let mut signatures = vec![]; + let mut user_transactions = vec![]; + for txn in transactions { + let txn_version = txn.version as i64; + let block_height = txn.block_height as i64; + let txn_data = match txn.txn_data.as_ref() { + Some(txn_data) => txn_data, + None => { + PROCESSOR_UNKNOWN_TYPE_COUNT + .with_label_values(&["UserTransactionProcessor"]) + .inc(); + tracing::warn!( + transaction_version = txn_version, + "Transaction data doesn't exist" + ); + continue; + }, + }; + if let TxnData::User(inner) = txn_data { + let (user_transaction, sigs) = UserTransactionModel::from_transaction( + inner, + txn.timestamp.as_ref().unwrap(), + block_height, + txn.epoch as i64, + txn_version, + ); + signatures.extend(sigs); + user_transactions.push(user_transaction); + } + } + + if deprecated_tables.contains(TableFlags::SIGNATURES) { + signatures.clear(); + } + + (user_transactions, signatures) +} diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index ae403d8ed..21e171715 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -7,8 +7,10 @@ use crate::{ processors::{ account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor, default_processor::DefaultProcessor, events_processor::EventsProcessor, - fungible_asset_processor::FungibleAssetProcessor, objects_processor::ObjectsProcessor, + fungible_asset_processor::FungibleAssetProcessor, + monitoring_processor::MonitoringProcessor, objects_processor::ObjectsProcessor, stake_processor::StakeProcessor, token_v2_processor::TokenV2Processor, + user_transaction_processor::UserTransactionProcessor, }, }; use anyhow::Result; @@ -55,10 +57,18 @@ impl RunnableConfig for IndexerProcessorConfig { let fungible_asset_processor = FungibleAssetProcessor::new(self.clone()).await?; fungible_asset_processor.run_processor().await }, + ProcessorConfig::UserTransactionProcessor(_) => { + let user_txns_processor = UserTransactionProcessor::new(self.clone()).await?; + user_txns_processor.run_processor().await + }, ProcessorConfig::StakeProcessor(_) => { let stake_processor = StakeProcessor::new(self.clone()).await?; stake_processor.run_processor().await }, + ProcessorConfig::MonitoringProcessor(_) => { + let monitoring_processor = MonitoringProcessor::new(self.clone()).await?; + monitoring_processor.run_processor().await + }, ProcessorConfig::TokenV2Processor(_) => { let token_v2_processor = TokenV2Processor::new(self.clone()).await?; token_v2_processor.run_processor().await diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 8e6c667ef..52ecc87c0 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -43,9 +43,11 @@ pub enum ProcessorConfig { DefaultProcessor(DefaultProcessorConfig), EventsProcessor(DefaultProcessorConfig), FungibleAssetProcessor(DefaultProcessorConfig), + UserTransactionProcessor(DefaultProcessorConfig), StakeProcessor(StakeProcessorConfig), TokenV2Processor(TokenV2ProcessorConfig), ObjectsProcessor(ObjectsProcessorConfig), + MonitoringProcessor(DefaultProcessorConfig), // ParquetProcessor ParquetDefaultProcessor(ParquetDefaultProcessorConfig), } diff --git a/rust/sdk-processor/src/processors/mod.rs b/rust/sdk-processor/src/processors/mod.rs index 281a13544..8045a022d 100644 --- a/rust/sdk-processor/src/processors/mod.rs +++ b/rust/sdk-processor/src/processors/mod.rs @@ -3,6 +3,8 @@ pub mod ans_processor; pub mod default_processor; pub mod events_processor; pub mod fungible_asset_processor; +pub mod monitoring_processor; pub mod objects_processor; pub mod stake_processor; pub mod token_v2_processor; +pub mod user_transaction_processor; diff --git a/rust/sdk-processor/src/processors/monitoring_processor.rs b/rust/sdk-processor/src/processors/monitoring_processor.rs new file mode 100644 index 000000000..c18d81838 --- /dev/null +++ b/rust/sdk-processor/src/processors/monitoring_processor.rs @@ -0,0 +1,127 @@ +use crate::{ + config::{ + db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + processor_config::ProcessorConfig, + }, + steps::common::get_processor_status_saver, + utils::{ + chain_id::check_or_update_chain_id, + database::{new_db_pool, run_migrations, ArcDbPool}, + starting_version::get_starting_version, + }, +}; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::{ + TransactionStreamStep, VersionTrackerStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + }, + traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, +}; +use tracing::{debug, info}; + +pub struct MonitoringProcessor { + pub config: IndexerProcessorConfig, + pub db_pool: ArcDbPool, +} + +impl MonitoringProcessor { + pub async fn new(config: IndexerProcessorConfig) -> Result { + match config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + let conn_pool = new_db_pool( + &postgres_config.connection_string, + Some(postgres_config.db_pool_size), + ) + .await + .map_err(|e| { + anyhow::anyhow!( + "Failed to create connection pool for PostgresConfig: {:?}", + e + ) + })?; + + Ok(Self { + config, + db_pool: conn_pool, + }) + }, + } + } +} + +#[async_trait::async_trait] +impl ProcessorTrait for MonitoringProcessor { + fn name(&self) -> &'static str { + self.config.processor_config.name() + } + + /// This processor no-ops and is used for monitoring purposes. + async fn run_processor(&self) -> Result<()> { + // Run migrations + match self.config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; + }, + } + + // Merge the starting version from config and the latest processed version from the DB + let starting_version = get_starting_version(&self.config, self.db_pool.clone()).await?; + + // Check and update the ledger chain id to ensure we're indexing the correct chain + let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) + .await? + .get_chain_id() + .await?; + check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; + + let processor_config = match self.config.processor_config.clone() { + ProcessorConfig::MonitoringProcessor(processor_config) => processor_config, + _ => { + return Err(anyhow::anyhow!( + "Invalid processor config for MonitoringProcessor: {:?}", + self.config.processor_config + )) + }, + }; + let channel_size = processor_config.channel_size; + + // Define processor steps + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + let version_tracker = VersionTrackerStep::new( + get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + ); + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to(version_tracker.into_runnable_step(), channel_size) + .end_and_return_output_receiver(channel_size); + + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Monitoring versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} diff --git a/rust/sdk-processor/src/processors/user_transaction_processor.rs b/rust/sdk-processor/src/processors/user_transaction_processor.rs new file mode 100644 index 000000000..73d08f5e3 --- /dev/null +++ b/rust/sdk-processor/src/processors/user_transaction_processor.rs @@ -0,0 +1,135 @@ +use crate::{ + config::{ + db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + processor_config::ProcessorConfig, + }, + steps::{ + common::get_processor_status_saver, + user_transaction_processor::{UserTransactionExtractor, UserTransactionStorer}, + }, + utils::{ + chain_id::check_or_update_chain_id, + database::{new_db_pool, run_migrations, ArcDbPool}, + starting_version::get_starting_version, + }, +}; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::{ + TransactionStreamStep, VersionTrackerStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + }, + traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, +}; +use processor::worker::TableFlags; +use tracing::{debug, info}; + +pub struct UserTransactionProcessor { + pub config: IndexerProcessorConfig, + pub db_pool: ArcDbPool, +} + +impl UserTransactionProcessor { + pub async fn new(config: IndexerProcessorConfig) -> Result { + match config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + let conn_pool = new_db_pool( + &postgres_config.connection_string, + Some(postgres_config.db_pool_size), + ) + .await + .map_err(|e| { + anyhow::anyhow!( + "Failed to create connection pool for PostgresConfig: {:?}", + e + ) + })?; + + Ok(Self { + config, + db_pool: conn_pool, + }) + }, + } + } +} + +#[async_trait::async_trait] +impl ProcessorTrait for UserTransactionProcessor { + fn name(&self) -> &'static str { + self.config.processor_config.name() + } + + async fn run_processor(&self) -> Result<()> { + // Run migrations + match self.config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; + }, + } + + // Merge the starting version from config and the latest processed version from the DB + let starting_version = get_starting_version(&self.config, self.db_pool.clone()).await?; + + // Check and update the ledger chain id to ensure we're indexing the correct chain + let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) + .await? + .get_chain_id() + .await?; + check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; + + let processor_config = match self.config.processor_config.clone() { + ProcessorConfig::UserTransactionProcessor(processor_config) => processor_config, + _ => { + return Err(anyhow::anyhow!( + "Invalid processor config for UserTransactionProcessor: {:?}", + self.config.processor_config + )) + }, + }; + let channel_size = processor_config.channel_size; + let deprecated_tables = TableFlags::from_set(&processor_config.deprecated_tables); + + // Define processor steps + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + let user_txn_extractor = UserTransactionExtractor::new(deprecated_tables); + let user_txn_storer = UserTransactionStorer::new(self.db_pool.clone(), processor_config); + let version_tracker = VersionTrackerStep::new( + get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + ); + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to(user_txn_extractor.into_runnable_step(), channel_size) + .connect_to(user_txn_storer.into_runnable_step(), channel_size) + .connect_to(version_tracker.into_runnable_step(), channel_size) + .end_and_return_output_receiver(channel_size); + + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing user txns from versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index f58f69ae2..1714253e4 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -7,5 +7,5 @@ pub mod fungible_asset_processor; pub mod objects_processor; pub mod stake_processor; pub mod token_v2_processor; - +pub mod user_transaction_processor; pub const MIN_TRANSACTIONS_PER_RAYON_JOB: usize = 64; diff --git a/rust/sdk-processor/src/steps/user_transaction_processor/mod.rs b/rust/sdk-processor/src/steps/user_transaction_processor/mod.rs new file mode 100644 index 000000000..794d04549 --- /dev/null +++ b/rust/sdk-processor/src/steps/user_transaction_processor/mod.rs @@ -0,0 +1,5 @@ +pub mod user_transaction_extractor; +pub mod user_transaction_storer; + +pub use user_transaction_extractor::UserTransactionExtractor; +pub use user_transaction_storer::UserTransactionStorer; diff --git a/rust/sdk-processor/src/steps/user_transaction_processor/user_transaction_extractor.rs b/rust/sdk-processor/src/steps/user_transaction_processor/user_transaction_extractor.rs new file mode 100644 index 000000000..a84e64b9c --- /dev/null +++ b/rust/sdk-processor/src/steps/user_transaction_processor/user_transaction_extractor.rs @@ -0,0 +1,56 @@ +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::{ + db::common::models::user_transactions_models::{ + signatures::Signature, user_transactions::UserTransactionModel, + }, + processors::user_transaction_processor::user_transaction_parse, + worker::TableFlags, +}; +pub struct UserTransactionExtractor +where + Self: Sized + Send + 'static, +{ + deprecated_tables: TableFlags, +} + +impl UserTransactionExtractor { + pub fn new(deprecated_tables: TableFlags) -> Self { + Self { deprecated_tables } + } +} + +#[async_trait] +impl Processable for UserTransactionExtractor { + type Input = Vec; + type Output = (Vec, Vec); + type RunType = AsyncRunType; + + async fn process( + &mut self, + item: TransactionContext>, + ) -> Result< + Option, Vec)>>, + ProcessorError, + > { + let (user_transactions, signatures) = + user_transaction_parse(item.data, self.deprecated_tables); + Ok(Some(TransactionContext { + data: (user_transactions, signatures), + metadata: item.metadata, + })) + } +} + +impl AsyncStep for UserTransactionExtractor {} + +impl NamedStep for UserTransactionExtractor { + fn name(&self) -> String { + "UserTransactionExtractor".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/user_transaction_processor/user_transaction_storer.rs b/rust/sdk-processor/src/steps/user_transaction_processor/user_transaction_storer.rs new file mode 100644 index 000000000..683e67a5b --- /dev/null +++ b/rust/sdk-processor/src/steps/user_transaction_processor/user_transaction_storer.rs @@ -0,0 +1,84 @@ +use crate::{ + config::processor_config::DefaultProcessorConfig, + utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, +}; +use ahash::AHashMap; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::{ + db::common::models::user_transactions_models::{ + signatures::Signature, user_transactions::UserTransactionModel, + }, + processors::user_transaction_processor::{ + insert_signatures_query, insert_user_transactions_query, + }, +}; +pub struct UserTransactionStorer +where + Self: Sized + Send + 'static, +{ + conn_pool: ArcDbPool, + processor_config: DefaultProcessorConfig, +} + +impl UserTransactionStorer { + pub fn new(conn_pool: ArcDbPool, processor_config: DefaultProcessorConfig) -> Self { + Self { + conn_pool, + processor_config, + } + } +} + +#[async_trait] +impl Processable for UserTransactionStorer { + type Input = (Vec, Vec); + type Output = (); + type RunType = AsyncRunType; + + async fn process( + &mut self, + input: TransactionContext<(Vec, Vec)>, + ) -> Result>, ProcessorError> { + let (user_txns, signatures) = input.data; + + let per_table_chunk_sizes: AHashMap = + self.processor_config.per_table_chunk_sizes.clone(); + + let ut_res = execute_in_chunks( + self.conn_pool.clone(), + insert_user_transactions_query, + &user_txns, + get_config_table_chunk_size::( + "user_transactions", + &per_table_chunk_sizes, + ), + ); + let s_res = execute_in_chunks( + self.conn_pool.clone(), + insert_signatures_query, + &signatures, + get_config_table_chunk_size::("signatures", &per_table_chunk_sizes), + ); + + futures::try_join!(ut_res, s_res)?; + + Ok(Some(TransactionContext { + data: (), + metadata: input.metadata, + })) + } +} + +impl AsyncStep for UserTransactionStorer {} + +impl NamedStep for UserTransactionStorer { + fn name(&self) -> String { + "UserTransactionStorer".to_string() + } +}