Skip to content

Commit

Permalink
move per table chunk size to processor
Browse files Browse the repository at this point in the history
  • Loading branch information
rtso committed Oct 30, 2024
1 parent 68d9046 commit 2ae6f87
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 14 deletions.
4 changes: 2 additions & 2 deletions rust/sdk-processor/src/config/indexer_processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::{
processors::{
account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor,
default_processor::DefaultProcessor, events_processor::EventsProcessor,
fungible_asset_processor::FungibleAssetProcessor, stake_processor::StakeProcessor,
token_v2_processor::TokenV2Processor,
fungible_asset_processor::FungibleAssetProcessor, objects_processor::ObjectsProcessor,
stake_processor::StakeProcessor, token_v2_processor::TokenV2Processor,
},
};
use anyhow::Result;
Expand Down
4 changes: 3 additions & 1 deletion rust/sdk-processor/src/processors/objects_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl ProcessorTrait for ObjectsProcessor {
};
let channel_size = processor_config.default_config.channel_size;
let table_flags = TableFlags::from_set(&processor_config.default_config.deprecated_tables);
let per_table_chunk_sizes = &processor_config.default_config.per_table_chunk_sizes;

// Define processor steps
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
Expand All @@ -126,7 +127,8 @@ impl ProcessorTrait for ObjectsProcessor {
self.db_pool.clone(),
table_flags,
);
let objects_storer = ObjectsStorer::new(self.db_pool.clone(), processor_config.clone());
let objects_storer =
ObjectsStorer::new(self.db_pool.clone(), per_table_chunk_sizes.clone());

let version_tracker = VersionTrackerStep::new(
get_processor_status_saver(self.db_pool.clone(), self.config.clone()),
Expand Down
19 changes: 8 additions & 11 deletions rust/sdk-processor/src/steps/objects_processor/objects_storer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ where
Self: Sized + Send + 'static,
{
conn_pool: ArcDbPool,
processor_config: ObjectsProcessorConfig,
per_table_chunk_sizes: AHashMap<String, usize>,
}

impl ObjectsStorer {
pub fn new(conn_pool: ArcDbPool, processor_config: ObjectsProcessorConfig) -> Self {
pub fn new(conn_pool: ArcDbPool, per_table_chunk_sizes: AHashMap<String, usize>) -> Self {
Self {
conn_pool,
processor_config,
per_table_chunk_sizes,
}
}
}
Expand All @@ -45,23 +45,20 @@ impl Processable for ObjectsStorer {
) -> Result<Option<TransactionContext<Self::Output>>, ProcessorError> {
let (objects, current_objects) = input.data;

let per_table_chunk_sizes: AHashMap<String, usize> = self
.processor_config
.default_config
.per_table_chunk_sizes
.clone();

let io = execute_in_chunks(
self.conn_pool.clone(),
insert_objects_query,
&objects,
get_config_table_chunk_size::<Object>("objects", &per_table_chunk_sizes),
get_config_table_chunk_size::<Object>("objects", &self.per_table_chunk_sizes),
);
let co = execute_in_chunks(
self.conn_pool.clone(),
insert_current_objects_query,
&current_objects,
get_config_table_chunk_size::<CurrentObject>("current_objects", &per_table_chunk_sizes),
get_config_table_chunk_size::<CurrentObject>(
"current_objects",
&self.per_table_chunk_sizes,
),
);
let (io_res, co_res) = tokio::join!(io, co);
for res in [io_res, co_res] {
Expand Down

0 comments on commit 2ae6f87

Please sign in to comment.