Skip to content

Commit

Permalink
Migrate objects processor to SDK (#574)
Browse files Browse the repository at this point in the history
* migrate objects

* move per table chunk size to processor

* fix lint;
  • Loading branch information
rtso authored Oct 30, 2024
1 parent 656ea32 commit 962b4a6
Show file tree
Hide file tree
Showing 10 changed files with 452 additions and 10 deletions.
6 changes: 3 additions & 3 deletions rust/processor/src/processors/objects_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async fn insert_to_db(
Ok(())
}

fn insert_objects_query(
pub fn insert_objects_query(
items_to_insert: Vec<Object>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand All @@ -123,7 +123,7 @@ fn insert_objects_query(
)
}

fn insert_current_objects_query(
pub fn insert_current_objects_query(
items_to_insert: Vec<CurrentObject>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand Down Expand Up @@ -164,7 +164,7 @@ impl ProcessorTrait for ObjectsProcessor {
end_version: u64,
_: Option<u64>,
) -> anyhow::Result<ProcessingResult> {
let processing_start = std::time::Instant::now();
let processing_start: std::time::Instant = std::time::Instant::now();
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();

let mut conn = self.get_conn().await;
Expand Down
8 changes: 6 additions & 2 deletions rust/sdk-processor/src/config/indexer_processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::{
processors::{
account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor,
default_processor::DefaultProcessor, events_processor::EventsProcessor,
fungible_asset_processor::FungibleAssetProcessor, stake_processor::StakeProcessor,
token_v2_processor::TokenV2Processor,
fungible_asset_processor::FungibleAssetProcessor, objects_processor::ObjectsProcessor,
stake_processor::StakeProcessor, token_v2_processor::TokenV2Processor,
},
};
use anyhow::Result;
Expand Down Expand Up @@ -63,6 +63,10 @@ impl RunnableConfig for IndexerProcessorConfig {
let token_v2_processor = TokenV2Processor::new(self.clone()).await?;
token_v2_processor.run_processor().await
},
ProcessorConfig::ObjectsProcessor(_) => {
let objects_processor = ObjectsProcessor::new(self.clone()).await?;
objects_processor.run_processor().await
},
ProcessorConfig::ParquetDefaultProcessor(_) => {
let parquet_default_processor = ParquetDefaultProcessor::new(self.clone()).await?;
parquet_default_processor.run_processor().await
Expand Down
5 changes: 3 additions & 2 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::processors::{
ans_processor::AnsProcessorConfig, stake_processor::StakeProcessorConfig,
token_v2_processor::TokenV2ProcessorConfig,
ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig,
stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig,
};
use ahash::AHashMap;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -45,6 +45,7 @@ pub enum ProcessorConfig {
FungibleAssetProcessor(DefaultProcessorConfig),
StakeProcessor(StakeProcessorConfig),
TokenV2Processor(TokenV2ProcessorConfig),
ObjectsProcessor(ObjectsProcessorConfig),
// ParquetProcessor
ParquetDefaultProcessor(ParquetDefaultProcessorConfig),
}
Expand Down
1 change: 1 addition & 0 deletions rust/sdk-processor/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ pub mod ans_processor;
pub mod default_processor;
pub mod events_processor;
pub mod fungible_asset_processor;
pub mod objects_processor;
pub mod stake_processor;
pub mod token_v2_processor;
161 changes: 161 additions & 0 deletions rust/sdk-processor/src/processors/objects_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use crate::{
config::{
db_config::DbConfig,
indexer_processor_config::{
IndexerProcessorConfig, QUERY_DEFAULT_RETRIES, QUERY_DEFAULT_RETRY_DELAY_MS,
},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
},
steps::{
common::get_processor_status_saver,
objects_processor::{objects_extractor::ObjectsExtractor, objects_storer::ObjectsStorer},
},
utils::{
chain_id::check_or_update_chain_id,
database::{new_db_pool, run_migrations, ArcDbPool},
starting_version::get_starting_version,
},
};
use anyhow::Result;
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig},
builder::ProcessorBuilder,
common_steps::{
TransactionStreamStep, VersionTrackerStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
},
traits::{processor_trait::ProcessorTrait, IntoRunnableStep},
};
use processor::worker::TableFlags;
use serde::{Deserialize, Serialize};
use tracing::{debug, info};

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct ObjectsProcessorConfig {
#[serde(flatten)]
pub default_config: DefaultProcessorConfig,
#[serde(default = "ObjectsProcessorConfig::default_query_retries")]
pub query_retries: u32,
#[serde(default = "ObjectsProcessorConfig::default_query_retry_delay_ms")]
pub query_retry_delay_ms: u64,
}

impl ObjectsProcessorConfig {
pub const fn default_query_retries() -> u32 {
QUERY_DEFAULT_RETRIES
}

pub const fn default_query_retry_delay_ms() -> u64 {
QUERY_DEFAULT_RETRY_DELAY_MS
}
}
pub struct ObjectsProcessor {
pub config: IndexerProcessorConfig,
pub db_pool: ArcDbPool,
}

impl ObjectsProcessor {
pub async fn new(config: IndexerProcessorConfig) -> Result<Self> {
match config.db_config {
DbConfig::PostgresConfig(ref postgres_config) => {
let conn_pool = new_db_pool(
&postgres_config.connection_string,
Some(postgres_config.db_pool_size),
)
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to create connection pool for PostgresConfig: {:?}",
e
)
})?;

Ok(Self {
config,
db_pool: conn_pool,
})
},
}
}
}

#[async_trait::async_trait]
impl ProcessorTrait for ObjectsProcessor {
fn name(&self) -> &'static str {
self.config.processor_config.name()
}

async fn run_processor(&self) -> Result<()> {
// Run migrations
match self.config.db_config {
DbConfig::PostgresConfig(ref postgres_config) => {
run_migrations(
postgres_config.connection_string.clone(),
self.db_pool.clone(),
)
.await;
},
}

// Merge the starting version from config and the latest processed version from the DB
let starting_version = get_starting_version(&self.config, self.db_pool.clone()).await?;

// Check and update the ledger chain id to ensure we're indexing the correct chain
let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone())
.await?
.get_chain_id()
.await?;
check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?;

let processor_config = match &self.config.processor_config {
ProcessorConfig::ObjectsProcessor(processor_config) => processor_config,
_ => return Err(anyhow::anyhow!("Processor config is wrong type")),
};
let channel_size = processor_config.default_config.channel_size;
let table_flags = TableFlags::from_set(&processor_config.default_config.deprecated_tables);
let per_table_chunk_sizes = &processor_config.default_config.per_table_chunk_sizes;

// Define processor steps
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..self.config.transaction_stream_config.clone()
})
.await?;
let objects_extractor = ObjectsExtractor::new(
processor_config.query_retries,
processor_config.query_retry_delay_ms,
self.db_pool.clone(),
table_flags,
);
let objects_storer =
ObjectsStorer::new(self.db_pool.clone(), per_table_chunk_sizes.clone());

let version_tracker = VersionTrackerStep::new(
get_processor_status_saver(self.db_pool.clone(), self.config.clone()),
DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
);
// Connect processor steps together
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
.connect_to(objects_extractor.into_runnable_step(), channel_size)
.connect_to(objects_storer.into_runnable_step(), channel_size)
.connect_to(version_tracker.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

loop {
match buffer_receiver.recv().await {
Ok(txn_context) => {
debug!(
"Finished processing versions [{:?}, {:?}]",
txn_context.metadata.start_version, txn_context.metadata.end_version,
);
},
Err(e) => {
info!("No more transactions in channel: {:?}", e);
break Ok(());
},
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::db::common::models::events_models::events::EventModel;
use crate::{
db::common::models::events_models::events::EventModel, steps::MIN_TRANSACTIONS_PER_RAYON_JOB,
};
use aptos_indexer_processor_sdk::{
aptos_protos::transaction::v1::{transaction::TxnData, Transaction},
traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable},
Expand All @@ -9,8 +11,6 @@ use async_trait::async_trait;
use rayon::prelude::*;
use tracing::warn;

pub const MIN_TRANSACTIONS_PER_RAYON_JOB: usize = 64;

pub struct EventsExtractor
where
Self: Sized + Send + 'static, {}
Expand Down
3 changes: 3 additions & 0 deletions rust/sdk-processor/src/steps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ pub mod common;
pub mod default_processor;
pub mod events_processor;
pub mod fungible_asset_processor;
pub mod objects_processor;
pub mod stake_processor;
pub mod token_v2_processor;

pub const MIN_TRANSACTIONS_PER_RAYON_JOB: usize = 64;
2 changes: 2 additions & 0 deletions rust/sdk-processor/src/steps/objects_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod objects_extractor;
pub mod objects_storer;
Loading

0 comments on commit 962b4a6

Please sign in to comment.