diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index 97a80752a..ae403d8ed 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -7,8 +7,8 @@ use crate::{ processors::{ account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor, default_processor::DefaultProcessor, events_processor::EventsProcessor, - fungible_asset_processor::FungibleAssetProcessor, stake_processor::StakeProcessor, - token_v2_processor::TokenV2Processor, + fungible_asset_processor::FungibleAssetProcessor, objects_processor::ObjectsProcessor, + stake_processor::StakeProcessor, token_v2_processor::TokenV2Processor, }, }; use anyhow::Result; diff --git a/rust/sdk-processor/src/processors/objects_processor.rs b/rust/sdk-processor/src/processors/objects_processor.rs index bf4f16ea4..d69945dd3 100644 --- a/rust/sdk-processor/src/processors/objects_processor.rs +++ b/rust/sdk-processor/src/processors/objects_processor.rs @@ -113,6 +113,7 @@ impl ProcessorTrait for ObjectsProcessor { }; let channel_size = processor_config.default_config.channel_size; let table_flags = TableFlags::from_set(&processor_config.default_config.deprecated_tables); + let per_table_chunk_sizes = &processor_config.default_config.per_table_chunk_sizes; // Define processor steps let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { @@ -126,7 +127,8 @@ impl ProcessorTrait for ObjectsProcessor { self.db_pool.clone(), table_flags, ); - let objects_storer = ObjectsStorer::new(self.db_pool.clone(), processor_config.clone()); + let objects_storer = + ObjectsStorer::new(self.db_pool.clone(), per_table_chunk_sizes.clone()); let version_tracker = VersionTrackerStep::new( get_processor_status_saver(self.db_pool.clone(), self.config.clone()), diff --git a/rust/sdk-processor/src/steps/objects_processor/objects_storer.rs b/rust/sdk-processor/src/steps/objects_processor/objects_storer.rs index 5f8ccd6b5..50318a31a 100644 --- a/rust/sdk-processor/src/steps/objects_processor/objects_storer.rs +++ b/rust/sdk-processor/src/steps/objects_processor/objects_storer.rs @@ -21,14 +21,14 @@ where Self: Sized + Send + 'static, { conn_pool: ArcDbPool, - processor_config: ObjectsProcessorConfig, + per_table_chunk_sizes: AHashMap, } impl ObjectsStorer { - pub fn new(conn_pool: ArcDbPool, processor_config: ObjectsProcessorConfig) -> Self { + pub fn new(conn_pool: ArcDbPool, per_table_chunk_sizes: AHashMap) -> Self { Self { conn_pool, - processor_config, + per_table_chunk_sizes, } } } @@ -45,23 +45,20 @@ impl Processable for ObjectsStorer { ) -> Result>, ProcessorError> { let (objects, current_objects) = input.data; - let per_table_chunk_sizes: AHashMap = self - .processor_config - .default_config - .per_table_chunk_sizes - .clone(); - let io = execute_in_chunks( self.conn_pool.clone(), insert_objects_query, &objects, - get_config_table_chunk_size::("objects", &per_table_chunk_sizes), + get_config_table_chunk_size::("objects", &self.per_table_chunk_sizes), ); let co = execute_in_chunks( self.conn_pool.clone(), insert_current_objects_query, ¤t_objects, - get_config_table_chunk_size::("current_objects", &per_table_chunk_sizes), + get_config_table_chunk_size::( + "current_objects", + &self.per_table_chunk_sizes, + ), ); let (io_res, co_res) = tokio::join!(io, co); for res in [io_res, co_res] {