diff --git a/rust/processor/src/processors/objects_processor.rs b/rust/processor/src/processors/objects_processor.rs index 4e8e6bea5..bd0117acc 100644 --- a/rust/processor/src/processors/objects_processor.rs +++ b/rust/processor/src/processors/objects_processor.rs @@ -106,7 +106,7 @@ async fn insert_to_db( Ok(()) } -fn insert_objects_query( +pub fn insert_objects_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -123,7 +123,7 @@ fn insert_objects_query( ) } -fn insert_current_objects_query( +pub fn insert_current_objects_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -164,7 +164,7 @@ impl ProcessorTrait for ObjectsProcessor { end_version: u64, _: Option, ) -> anyhow::Result { - let processing_start = std::time::Instant::now(); + let processing_start: std::time::Instant = std::time::Instant::now(); let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); let mut conn = self.get_conn().await; diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index 9a692ea13..ae403d8ed 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -7,8 +7,8 @@ use crate::{ processors::{ account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor, default_processor::DefaultProcessor, events_processor::EventsProcessor, - fungible_asset_processor::FungibleAssetProcessor, stake_processor::StakeProcessor, - token_v2_processor::TokenV2Processor, + fungible_asset_processor::FungibleAssetProcessor, objects_processor::ObjectsProcessor, + stake_processor::StakeProcessor, token_v2_processor::TokenV2Processor, }, }; use anyhow::Result; @@ -63,6 +63,10 @@ impl RunnableConfig for IndexerProcessorConfig { let token_v2_processor = TokenV2Processor::new(self.clone()).await?; token_v2_processor.run_processor().await }, + ProcessorConfig::ObjectsProcessor(_) => { + let objects_processor = ObjectsProcessor::new(self.clone()).await?; + objects_processor.run_processor().await + }, ProcessorConfig::ParquetDefaultProcessor(_) => { let parquet_default_processor = ParquetDefaultProcessor::new(self.clone()).await?; parquet_default_processor.run_processor().await diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 9b5151118..0f8375529 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -1,6 +1,6 @@ use crate::processors::{ - ans_processor::AnsProcessorConfig, stake_processor::StakeProcessorConfig, - token_v2_processor::TokenV2ProcessorConfig, + ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig, + stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig, }; use ahash::AHashMap; use serde::{Deserialize, Serialize}; @@ -45,6 +45,7 @@ pub enum ProcessorConfig { FungibleAssetProcessor(DefaultProcessorConfig), StakeProcessor(StakeProcessorConfig), TokenV2Processor(TokenV2ProcessorConfig), + ObjectsProcessor(ObjectsProcessorConfig), // ParquetProcessor ParquetDefaultProcessor(ParquetDefaultProcessorConfig), } diff --git a/rust/sdk-processor/src/processors/mod.rs b/rust/sdk-processor/src/processors/mod.rs index 31935d6b4..281a13544 100644 --- a/rust/sdk-processor/src/processors/mod.rs +++ b/rust/sdk-processor/src/processors/mod.rs @@ -3,5 +3,6 @@ pub mod ans_processor; pub mod default_processor; pub mod events_processor; pub mod fungible_asset_processor; +pub mod objects_processor; pub mod stake_processor; pub mod token_v2_processor; diff --git a/rust/sdk-processor/src/processors/objects_processor.rs b/rust/sdk-processor/src/processors/objects_processor.rs new file mode 100644 index 000000000..d69945dd3 --- /dev/null +++ b/rust/sdk-processor/src/processors/objects_processor.rs @@ -0,0 +1,161 @@ +use crate::{ + config::{ + db_config::DbConfig, + indexer_processor_config::{ + IndexerProcessorConfig, QUERY_DEFAULT_RETRIES, QUERY_DEFAULT_RETRY_DELAY_MS, + }, + processor_config::{DefaultProcessorConfig, ProcessorConfig}, + }, + steps::{ + common::get_processor_status_saver, + objects_processor::{objects_extractor::ObjectsExtractor, objects_storer::ObjectsStorer}, + }, + utils::{ + chain_id::check_or_update_chain_id, + database::{new_db_pool, run_migrations, ArcDbPool}, + starting_version::get_starting_version, + }, +}; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::{ + TransactionStreamStep, VersionTrackerStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + }, + traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, +}; +use processor::worker::TableFlags; +use serde::{Deserialize, Serialize}; +use tracing::{debug, info}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct ObjectsProcessorConfig { + #[serde(flatten)] + pub default_config: DefaultProcessorConfig, + #[serde(default = "ObjectsProcessorConfig::default_query_retries")] + pub query_retries: u32, + #[serde(default = "ObjectsProcessorConfig::default_query_retry_delay_ms")] + pub query_retry_delay_ms: u64, +} + +impl ObjectsProcessorConfig { + pub const fn default_query_retries() -> u32 { + QUERY_DEFAULT_RETRIES + } + + pub const fn default_query_retry_delay_ms() -> u64 { + QUERY_DEFAULT_RETRY_DELAY_MS + } +} +pub struct ObjectsProcessor { + pub config: IndexerProcessorConfig, + pub db_pool: ArcDbPool, +} + +impl ObjectsProcessor { + pub async fn new(config: IndexerProcessorConfig) -> Result { + match config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + let conn_pool = new_db_pool( + &postgres_config.connection_string, + Some(postgres_config.db_pool_size), + ) + .await + .map_err(|e| { + anyhow::anyhow!( + "Failed to create connection pool for PostgresConfig: {:?}", + e + ) + })?; + + Ok(Self { + config, + db_pool: conn_pool, + }) + }, + } + } +} + +#[async_trait::async_trait] +impl ProcessorTrait for ObjectsProcessor { + fn name(&self) -> &'static str { + self.config.processor_config.name() + } + + async fn run_processor(&self) -> Result<()> { + // Run migrations + match self.config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; + }, + } + + // Merge the starting version from config and the latest processed version from the DB + let starting_version = get_starting_version(&self.config, self.db_pool.clone()).await?; + + // Check and update the ledger chain id to ensure we're indexing the correct chain + let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) + .await? + .get_chain_id() + .await?; + check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; + + let processor_config = match &self.config.processor_config { + ProcessorConfig::ObjectsProcessor(processor_config) => processor_config, + _ => return Err(anyhow::anyhow!("Processor config is wrong type")), + }; + let channel_size = processor_config.default_config.channel_size; + let table_flags = TableFlags::from_set(&processor_config.default_config.deprecated_tables); + let per_table_chunk_sizes = &processor_config.default_config.per_table_chunk_sizes; + + // Define processor steps + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + let objects_extractor = ObjectsExtractor::new( + processor_config.query_retries, + processor_config.query_retry_delay_ms, + self.db_pool.clone(), + table_flags, + ); + let objects_storer = + ObjectsStorer::new(self.db_pool.clone(), per_table_chunk_sizes.clone()); + + let version_tracker = VersionTrackerStep::new( + get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + ); + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to(objects_extractor.into_runnable_step(), channel_size) + .connect_to(objects_storer.into_runnable_step(), channel_size) + .connect_to(version_tracker.into_runnable_step(), channel_size) + .end_and_return_output_receiver(channel_size); + + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} diff --git a/rust/sdk-processor/src/steps/events_processor/events_extractor.rs b/rust/sdk-processor/src/steps/events_processor/events_extractor.rs index bb444d986..10db8b945 100644 --- a/rust/sdk-processor/src/steps/events_processor/events_extractor.rs +++ b/rust/sdk-processor/src/steps/events_processor/events_extractor.rs @@ -1,4 +1,6 @@ -use crate::db::common::models::events_models::events::EventModel; +use crate::{ + db::common::models::events_models::events::EventModel, steps::MIN_TRANSACTIONS_PER_RAYON_JOB, +}; use aptos_indexer_processor_sdk::{ aptos_protos::transaction::v1::{transaction::TxnData, Transaction}, traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, @@ -9,8 +11,6 @@ use async_trait::async_trait; use rayon::prelude::*; use tracing::warn; -pub const MIN_TRANSACTIONS_PER_RAYON_JOB: usize = 64; - pub struct EventsExtractor where Self: Sized + Send + 'static, {} diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index a1632e818..f58f69ae2 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -4,5 +4,8 @@ pub mod common; pub mod default_processor; pub mod events_processor; pub mod fungible_asset_processor; +pub mod objects_processor; pub mod stake_processor; pub mod token_v2_processor; + +pub const MIN_TRANSACTIONS_PER_RAYON_JOB: usize = 64; diff --git a/rust/sdk-processor/src/steps/objects_processor/mod.rs b/rust/sdk-processor/src/steps/objects_processor/mod.rs new file mode 100644 index 000000000..1de68cb6a --- /dev/null +++ b/rust/sdk-processor/src/steps/objects_processor/mod.rs @@ -0,0 +1,2 @@ +pub mod objects_extractor; +pub mod objects_storer; diff --git a/rust/sdk-processor/src/steps/objects_processor/objects_extractor.rs b/rust/sdk-processor/src/steps/objects_processor/objects_extractor.rs new file mode 100644 index 000000000..34581dac1 --- /dev/null +++ b/rust/sdk-processor/src/steps/objects_processor/objects_extractor.rs @@ -0,0 +1,181 @@ +use crate::utils::database::ArcDbPool; +use ahash::AHashMap; +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::{write_set_change::Change, Transaction}, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::{convert::standardize_address, errors::ProcessorError}, +}; +use async_trait::async_trait; +use processor::{ + db::common::models::object_models::{ + v2_object_utils::{ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata}, + v2_objects::{CurrentObject, Object}, + }, + worker::TableFlags, +}; + +/// Extracts fungible asset events, metadata, balances, and v1 supply from transactions +pub struct ObjectsExtractor +where + Self: Sized + Send + 'static, +{ + query_retries: u32, + query_retry_delay_ms: u64, + conn_pool: ArcDbPool, + deprecated_tables: TableFlags, +} + +impl ObjectsExtractor { + pub fn new( + query_retries: u32, + query_retry_delay_ms: u64, + conn_pool: ArcDbPool, + deprecated_tables: TableFlags, + ) -> Self { + Self { + query_retries, + query_retry_delay_ms, + conn_pool, + deprecated_tables, + } + } +} + +#[async_trait] +impl Processable for ObjectsExtractor { + type Input = Vec; + type Output = (Vec, Vec); + type RunType = AsyncRunType; + + async fn process( + &mut self, + transactions: TransactionContext>, + ) -> Result, Vec)>>, ProcessorError> { + let mut conn = self + .conn_pool + .get() + .await + .map_err(|e| ProcessorError::DBStoreError { + message: format!("Failed to get connection from pool: {:?}", e), + query: None, + })?; + let query_retries = self.query_retries; + let query_retry_delay_ms = self.query_retry_delay_ms; + + // Moving object handling here because we need a single object + // map through transactions for lookups + let mut all_objects = vec![]; + let mut all_current_objects = AHashMap::new(); + let mut object_metadata_helper: ObjectAggregatedDataMapping = AHashMap::new(); + + for txn in &transactions.data { + let txn_version = txn.version as i64; + let changes = &txn + .info + .as_ref() + .unwrap_or_else(|| { + panic!( + "Transaction info doesn't exist! Transaction {}", + txn_version + ) + }) + .changes; + + // First pass to get all the object cores + for wsc in changes.iter() { + if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { + let address = standardize_address(&wr.address.to_string()); + if let Some(object_with_metadata) = + ObjectWithMetadata::from_write_resource(wr, txn_version).unwrap() + { + // Object core is the first struct that we need to get + object_metadata_helper.insert(address.clone(), ObjectAggregatedData { + object: object_with_metadata, + token: None, + fungible_asset_store: None, + // The following structs are unused in this processor + fungible_asset_metadata: None, + aptos_collection: None, + fixed_supply: None, + unlimited_supply: None, + concurrent_supply: None, + property_map: None, + transfer_events: vec![], + untransferable: None, + fungible_asset_supply: None, + concurrent_fungible_asset_supply: None, + concurrent_fungible_asset_balance: None, + token_identifier: None, + }); + } + } + } + + // Second pass to construct the object data + for (index, wsc) in changes.iter().enumerate() { + let index: i64 = index as i64; + match wsc.change.as_ref().unwrap() { + Change::WriteResource(inner) => { + if let Some((object, current_object)) = &Object::from_write_resource( + inner, + txn_version, + index, + &object_metadata_helper, + ) + .unwrap() + { + all_objects.push(object.clone()); + all_current_objects + .insert(object.object_address.clone(), current_object.clone()); + } + }, + Change::DeleteResource(inner) => { + // Passing all_current_objects into the function so that we can get the owner of the deleted + // resource if it was handled in the same batch + if let Some((object, current_object)) = Object::from_delete_resource( + inner, + txn_version, + index, + &all_current_objects, + &mut conn, + query_retries, + query_retry_delay_ms, + ) + .await + .unwrap() + { + all_objects.push(object.clone()); + all_current_objects + .insert(object.object_address.clone(), current_object.clone()); + } + }, + _ => {}, + }; + } + } + + // Sort by PK + let mut all_current_objects = all_current_objects + .into_values() + .collect::>(); + all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address)); + + if self.deprecated_tables.contains(TableFlags::OBJECTS) { + all_objects.clear(); + } + + Ok(Some(TransactionContext { + data: (all_objects, all_current_objects), + metadata: transactions.metadata, + })) + } +} + +impl AsyncStep for ObjectsExtractor {} + +impl NamedStep for ObjectsExtractor { + fn name(&self) -> String { + "ObjectsExtractor".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/objects_processor/objects_storer.rs b/rust/sdk-processor/src/steps/objects_processor/objects_storer.rs new file mode 100644 index 000000000..72ac5dacc --- /dev/null +++ b/rust/sdk-processor/src/steps/objects_processor/objects_storer.rs @@ -0,0 +1,89 @@ +use crate::utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}; +use ahash::AHashMap; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::{ + self, + db::common::models::object_models::v2_objects::{CurrentObject, Object}, + processors::objects_processor::{insert_current_objects_query, insert_objects_query}, +}; + +pub struct ObjectsStorer +where + Self: Sized + Send + 'static, +{ + conn_pool: ArcDbPool, + per_table_chunk_sizes: AHashMap, +} + +impl ObjectsStorer { + pub fn new(conn_pool: ArcDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + conn_pool, + per_table_chunk_sizes, + } + } +} + +#[async_trait] +impl Processable for ObjectsStorer { + type Input = (Vec, Vec); + type Output = (); + type RunType = AsyncRunType; + + async fn process( + &mut self, + input: TransactionContext<(Vec, Vec)>, + ) -> Result>, ProcessorError> { + let (objects, current_objects) = input.data; + + let io = execute_in_chunks( + self.conn_pool.clone(), + insert_objects_query, + &objects, + get_config_table_chunk_size::("objects", &self.per_table_chunk_sizes), + ); + let co = execute_in_chunks( + self.conn_pool.clone(), + insert_current_objects_query, + ¤t_objects, + get_config_table_chunk_size::( + "current_objects", + &self.per_table_chunk_sizes, + ), + ); + let (io_res, co_res) = tokio::join!(io, co); + for res in [io_res, co_res] { + match res { + Ok(_) => {}, + Err(e) => { + return Err(ProcessorError::DBStoreError { + message: format!( + "Failed to store versions {} to {}: {:?}", + input.metadata.start_version, input.metadata.end_version, e, + ), + query: None, + }) + }, + } + } + + Ok(Some(TransactionContext { + data: (), + metadata: input.metadata, + })) + } +} + +impl AsyncStep for ObjectsStorer {} + +impl NamedStep for ObjectsStorer { + fn name(&self) -> String { + "ObjectsStorer".to_string() + } +}