diff --git a/rust/processor/src/processors/default_processor.rs b/rust/processor/src/processors/default_processor.rs index 661553ef8..86da47c4c 100644 --- a/rust/processor/src/processors/default_processor.rs +++ b/rust/processor/src/processors/default_processor.rs @@ -124,7 +124,7 @@ async fn insert_to_db( Ok(()) } -fn insert_block_metadata_transactions_query( +pub fn insert_block_metadata_transactions_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -141,7 +141,7 @@ fn insert_block_metadata_transactions_query( ) } -fn insert_table_items_query( +pub fn insert_table_items_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -158,7 +158,7 @@ fn insert_table_items_query( ) } -fn insert_current_table_items_query( +pub fn insert_current_table_items_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -183,7 +183,7 @@ fn insert_current_table_items_query( ) } -fn insert_table_metadata_query( +pub fn insert_table_metadata_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -272,7 +272,27 @@ impl ProcessorTrait for DefaultProcessor { } } -fn process_transactions( +/// Processes a list of transactions and extracts relevant data into different models. +/// +/// This function iterates over a list of transactions, extracting block metadata transactions, +/// table items, current table items, and table metadata. It handles different types of +/// transactions and write set changes, converting them into appropriate models. The function +/// also sorts the extracted data to avoid PostgreSQL deadlocks during multi-threaded database +/// writes. +/// +/// # Arguments +/// +/// * `transactions` - A vector of `Transaction` objects to be processed. +/// * `flags` - A `TableFlags` object that determines which tables to clear after processing. +/// +/// # Returns +/// +/// A tuple containing: +/// * `Vec` - A vector of block metadata transaction models. +/// * `Vec` - A vector of table items. +/// * `Vec` - A vector of current table items, sorted by primary key. +/// * `Vec` - A vector of table metadata, sorted by primary key. +pub fn process_transactions( transactions: Vec, flags: TableFlags, ) -> ( diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index da054c4d4..1449407d4 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -3,8 +3,8 @@ use super::{db_config::DbConfig, processor_config::ProcessorConfig}; use crate::processors::{ - events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor, - token_v2_processor::TokenV2Processor, + default_processor::DefaultProcessor, events_processor::EventsProcessor, + fungible_asset_processor::FungibleAssetProcessor, token_v2_processor::TokenV2Processor, }; use anyhow::Result; use aptos_indexer_processor_sdk::{ @@ -38,6 +38,10 @@ impl RunnableConfig for IndexerProcessorConfig { let fungible_asset_processor = FungibleAssetProcessor::new(self.clone()).await?; fungible_asset_processor.run_processor().await }, + ProcessorConfig::DefaultProcessor(_) => { + let default_processor = DefaultProcessor::new(self.clone()).await?; + default_processor.run_processor().await + }, ProcessorConfig::TokenV2Processor(_) => { let token_v2_processor = TokenV2Processor::new(self.clone()).await?; token_v2_processor.run_processor().await diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 4dc93a7f0..51dba38cb 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -36,6 +36,7 @@ use std::collections::HashSet; strum(serialize_all = "snake_case") )] pub enum ProcessorConfig { + DefaultProcessor(DefaultProcessorConfig), EventsProcessor(DefaultProcessorConfig), FungibleAssetProcessor(DefaultProcessorConfig), TokenV2Processor(TokenV2ProcessorConfig), diff --git a/rust/sdk-processor/src/processors/default_processor.rs b/rust/sdk-processor/src/processors/default_processor.rs new file mode 100644 index 000000000..e27dd0d0b --- /dev/null +++ b/rust/sdk-processor/src/processors/default_processor.rs @@ -0,0 +1,139 @@ +use crate::{ + config::{ + db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + processor_config::ProcessorConfig, + }, + steps::{ + common::get_processor_status_saver, + default_processor::{default_extractor::DefaultExtractor, default_storer::DefaultStorer}, + }, + utils::{ + chain_id::check_or_update_chain_id, + database::{new_db_pool, run_migrations, ArcDbPool}, + starting_version::get_starting_version, + }, +}; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::{ + TransactionStreamStep, VersionTrackerStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + }, + traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, +}; +use async_trait::async_trait; +use processor::worker::TableFlags; +use tracing::{debug, info}; + +pub struct DefaultProcessor { + pub config: IndexerProcessorConfig, + pub db_pool: ArcDbPool, +} + +impl DefaultProcessor { + pub async fn new(config: IndexerProcessorConfig) -> Result { + match config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + let conn_pool = new_db_pool( + &postgres_config.connection_string, + Some(postgres_config.db_pool_size), + ) + .await + .map_err(|e| { + anyhow::anyhow!( + "Failed to create connection pool for PostgresConfig: {:?}", + e + ) + })?; + + Ok(Self { + config, + db_pool: conn_pool, + }) + }, + } + } +} + +#[async_trait] +impl ProcessorTrait for DefaultProcessor { + fn name(&self) -> &'static str { + self.config.processor_config.name() + } + + async fn run_processor(&self) -> Result<()> { + // Run migrations + match self.config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; + }, + } + + // Merge the starting version from config and the latest processed version from the DB + let starting_version = get_starting_version(&self.config, self.db_pool.clone()).await?; + + // Check and update the ledger chain id to ensure we're indexing the correct chain + let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) + .await? + .get_chain_id() + .await?; + check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; + + let processor_config = match self.config.processor_config.clone() { + ProcessorConfig::DefaultProcessor(processor_config) => processor_config, + _ => { + return Err(anyhow::anyhow!( + "Invalid processor config for DefaultProcessor: {:?}", + self.config.processor_config + )) + }, + }; + let channel_size = processor_config.channel_size; + let deprecated_table_flags = TableFlags::from_set(&processor_config.deprecated_tables); + + // Define processor steps + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + let default_extractor = DefaultExtractor { + deprecated_table_flags, + }; + let default_storer = DefaultStorer::new(self.db_pool.clone(), processor_config); + let version_tracker = VersionTrackerStep::new( + get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + ); + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to(default_extractor.into_runnable_step(), channel_size) + .connect_to(default_storer.into_runnable_step(), channel_size) + .connect_to(version_tracker.into_runnable_step(), channel_size) + .end_and_return_output_receiver(channel_size); + + // (Optional) Parse the results + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} diff --git a/rust/sdk-processor/src/processors/mod.rs b/rust/sdk-processor/src/processors/mod.rs index 25e0350dc..19584d705 100644 --- a/rust/sdk-processor/src/processors/mod.rs +++ b/rust/sdk-processor/src/processors/mod.rs @@ -1,3 +1,4 @@ +pub mod default_processor; pub mod events_processor; pub mod fungible_asset_processor; pub mod token_v2_processor; diff --git a/rust/sdk-processor/src/steps/default_processor/default_extractor.rs b/rust/sdk-processor/src/steps/default_processor/default_extractor.rs new file mode 100644 index 000000000..b4cd700db --- /dev/null +++ b/rust/sdk-processor/src/steps/default_processor/default_extractor.rs @@ -0,0 +1,72 @@ +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::{ + db::common::models::default_models::{ + block_metadata_transactions::BlockMetadataTransactionModel, + move_tables::{CurrentTableItem, TableItem, TableMetadata}, + }, + processors::default_processor::process_transactions, + worker::TableFlags, +}; +pub const MIN_TRANSACTIONS_PER_RAYON_JOB: usize = 64; + +pub struct DefaultExtractor +where + Self: Sized + Send + 'static, +{ + pub deprecated_table_flags: TableFlags, +} + +#[async_trait] +impl Processable for DefaultExtractor { + type Input = Vec; + type Output = ( + Vec, + Vec, + Vec, + Vec, + ); + type RunType = AsyncRunType; + + async fn process( + &mut self, + transactions: TransactionContext>, + ) -> Result< + Option< + TransactionContext<( + Vec, + Vec, + Vec, + Vec, + )>, + >, + ProcessorError, + > { + let flags = self.deprecated_table_flags; + let (block_metadata_transactions, table_items, current_table_items, table_metadata) = + process_transactions(transactions.data, flags); + + Ok(Some(TransactionContext { + data: ( + block_metadata_transactions, + table_items, + current_table_items, + table_metadata, + ), + metadata: transactions.metadata, + })) + } +} + +impl AsyncStep for DefaultExtractor {} + +impl NamedStep for DefaultExtractor { + fn name(&self) -> String { + "DefaultExtractor".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/default_processor/default_storer.rs b/rust/sdk-processor/src/steps/default_processor/default_storer.rs new file mode 100644 index 000000000..a8cbe673b --- /dev/null +++ b/rust/sdk-processor/src/steps/default_processor/default_storer.rs @@ -0,0 +1,141 @@ +use crate::{ + config::processor_config::DefaultProcessorConfig, + utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, +}; +use ahash::AHashMap; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::{ + db::common::models::default_models::{ + block_metadata_transactions::BlockMetadataTransactionModel, + move_tables::{CurrentTableItem, TableItem, TableMetadata}, + }, + processors::default_processor::{ + insert_block_metadata_transactions_query, insert_current_table_items_query, + insert_table_items_query, insert_table_metadata_query, + }, +}; + +pub struct DefaultStorer +where + Self: Sized + Send + 'static, +{ + conn_pool: ArcDbPool, + processor_config: DefaultProcessorConfig, +} + +impl DefaultStorer { + pub fn new(conn_pool: ArcDbPool, processor_config: DefaultProcessorConfig) -> Self { + Self { + conn_pool, + processor_config, + } + } +} + +#[async_trait] +impl Processable for DefaultStorer { + type Input = ( + Vec, + Vec, + Vec, + Vec, + ); + type Output = (); + type RunType = AsyncRunType; + + /// Processes a batch of transactions and inserts the extracted data into the database. + /// + /// This function takes a `TransactionContext` containing vectors of block metadata transactions, + /// table items, current table items, and table metadata. It processes these vectors by executing + /// database insertion operations in chunks to handle large datasets efficiently. The function + /// uses `futures::try_join!` to run the insertion operations concurrently and ensures that all + /// operations complete successfully. + /// + /// # Arguments + /// + /// * `input` - A `TransactionContext` containing: + /// * `Vec` - A vector of block metadata transaction models. + /// * `Vec` - A vector of table items. + /// * `Vec` - A vector of current table items. + /// * `Vec` - A vector of table metadata. + /// + /// # Returns + /// + /// * `Result>, ProcessorError>` - Returns `Ok(Some(TransactionContext))` + /// if all insertion operations complete successfully. Returns an error if any of the operations fail. + async fn process( + &mut self, + input: TransactionContext<( + Vec, + Vec, + Vec, + Vec, + )>, + ) -> Result>, ProcessorError> { + let (block_metadata_transactions, table_items, current_table_items, table_metadata) = + input.data; + + let per_table_chunk_sizes: AHashMap = + self.processor_config.per_table_chunk_sizes.clone(); + + let bmt_res = execute_in_chunks( + self.conn_pool.clone(), + insert_block_metadata_transactions_query, + &block_metadata_transactions, + get_config_table_chunk_size::( + "block_metadata_transactions", + &per_table_chunk_sizes, + ), + ); + + let table_items_res = execute_in_chunks( + self.conn_pool.clone(), + insert_table_items_query, + &table_items, + get_config_table_chunk_size::("table_items", &per_table_chunk_sizes), + ); + + let current_table_items_res = execute_in_chunks( + self.conn_pool.clone(), + insert_current_table_items_query, + ¤t_table_items, + get_config_table_chunk_size::( + "current_table_items", + &per_table_chunk_sizes, + ), + ); + + let table_metadata_res = execute_in_chunks( + self.conn_pool.clone(), + insert_table_metadata_query, + &table_metadata, + get_config_table_chunk_size::("table_metadata", &per_table_chunk_sizes), + ); + + futures::try_join!( + bmt_res, + table_items_res, + current_table_items_res, + table_metadata_res + )?; + + Ok(Some(TransactionContext { + data: (), + metadata: input.metadata, + })) + } +} + +impl AsyncStep for DefaultStorer {} + +impl NamedStep for DefaultStorer { + fn name(&self) -> String { + "DefaultStorer".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/default_processor/mod.rs b/rust/sdk-processor/src/steps/default_processor/mod.rs new file mode 100644 index 000000000..c257b2538 --- /dev/null +++ b/rust/sdk-processor/src/steps/default_processor/mod.rs @@ -0,0 +1,2 @@ +pub mod default_extractor; +pub mod default_storer; diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index f11032381..501846e10 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -1,4 +1,5 @@ pub mod common; +pub mod default_processor; pub mod events_processor; pub mod fungible_asset_processor; pub mod token_v2_processor;