Skip to content

Commit

Permalink
Migrate Default Processor to the SDK (#567)
Browse files Browse the repository at this point in the history
* Init

* Implement storer

* correct function input

* Formatting + docstrings

* qualify async_trait

* Remove async block
  • Loading branch information
dermanyang authored Oct 29, 2024
1 parent 4d7849a commit 7b2873f
Show file tree
Hide file tree
Showing 9 changed files with 388 additions and 7 deletions.
30 changes: 25 additions & 5 deletions rust/processor/src/processors/default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async fn insert_to_db(
Ok(())
}

fn insert_block_metadata_transactions_query(
pub fn insert_block_metadata_transactions_query(
items_to_insert: Vec<BlockMetadataTransactionModel>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand All @@ -141,7 +141,7 @@ fn insert_block_metadata_transactions_query(
)
}

fn insert_table_items_query(
pub fn insert_table_items_query(
items_to_insert: Vec<TableItem>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand All @@ -158,7 +158,7 @@ fn insert_table_items_query(
)
}

fn insert_current_table_items_query(
pub fn insert_current_table_items_query(
items_to_insert: Vec<CurrentTableItem>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand All @@ -183,7 +183,7 @@ fn insert_current_table_items_query(
)
}

fn insert_table_metadata_query(
pub fn insert_table_metadata_query(
items_to_insert: Vec<TableMetadata>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand Down Expand Up @@ -272,7 +272,27 @@ impl ProcessorTrait for DefaultProcessor {
}
}

fn process_transactions(
/// Processes a list of transactions and extracts relevant data into different models.
///
/// This function iterates over a list of transactions, extracting block metadata transactions,
/// table items, current table items, and table metadata. It handles different types of
/// transactions and write set changes, converting them into appropriate models. The function
/// also sorts the extracted data to avoid PostgreSQL deadlocks during multi-threaded database
/// writes.
///
/// # Arguments
///
/// * `transactions` - A vector of `Transaction` objects to be processed.
/// * `flags` - A `TableFlags` object that determines which tables to clear after processing.
///
/// # Returns
///
/// A tuple containing:
/// * `Vec<BlockMetadataTransactionModel>` - A vector of block metadata transaction models.
/// * `Vec<TableItem>` - A vector of table items.
/// * `Vec<CurrentTableItem>` - A vector of current table items, sorted by primary key.
/// * `Vec<TableMetadata>` - A vector of table metadata, sorted by primary key.
pub fn process_transactions(
transactions: Vec<Transaction>,
flags: TableFlags,
) -> (
Expand Down
8 changes: 6 additions & 2 deletions rust/sdk-processor/src/config/indexer_processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use super::{db_config::DbConfig, processor_config::ProcessorConfig};
use crate::processors::{
events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor,
token_v2_processor::TokenV2Processor,
default_processor::DefaultProcessor, events_processor::EventsProcessor,
fungible_asset_processor::FungibleAssetProcessor, token_v2_processor::TokenV2Processor,
};
use anyhow::Result;
use aptos_indexer_processor_sdk::{
Expand Down Expand Up @@ -38,6 +38,10 @@ impl RunnableConfig for IndexerProcessorConfig {
let fungible_asset_processor = FungibleAssetProcessor::new(self.clone()).await?;
fungible_asset_processor.run_processor().await
},
ProcessorConfig::DefaultProcessor(_) => {
let default_processor = DefaultProcessor::new(self.clone()).await?;
default_processor.run_processor().await
},
ProcessorConfig::TokenV2Processor(_) => {
let token_v2_processor = TokenV2Processor::new(self.clone()).await?;
token_v2_processor.run_processor().await
Expand Down
1 change: 1 addition & 0 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use std::collections::HashSet;
strum(serialize_all = "snake_case")
)]
pub enum ProcessorConfig {
DefaultProcessor(DefaultProcessorConfig),
EventsProcessor(DefaultProcessorConfig),
FungibleAssetProcessor(DefaultProcessorConfig),
TokenV2Processor(TokenV2ProcessorConfig),
Expand Down
139 changes: 139 additions & 0 deletions rust/sdk-processor/src/processors/default_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use crate::{
config::{
db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig,
processor_config::ProcessorConfig,
},
steps::{
common::get_processor_status_saver,
default_processor::{default_extractor::DefaultExtractor, default_storer::DefaultStorer},
},
utils::{
chain_id::check_or_update_chain_id,
database::{new_db_pool, run_migrations, ArcDbPool},
starting_version::get_starting_version,
},
};
use anyhow::Result;
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig},
builder::ProcessorBuilder,
common_steps::{
TransactionStreamStep, VersionTrackerStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
},
traits::{processor_trait::ProcessorTrait, IntoRunnableStep},
};
use async_trait::async_trait;
use processor::worker::TableFlags;
use tracing::{debug, info};

pub struct DefaultProcessor {
pub config: IndexerProcessorConfig,
pub db_pool: ArcDbPool,
}

impl DefaultProcessor {
pub async fn new(config: IndexerProcessorConfig) -> Result<Self> {
match config.db_config {
DbConfig::PostgresConfig(ref postgres_config) => {
let conn_pool = new_db_pool(
&postgres_config.connection_string,
Some(postgres_config.db_pool_size),
)
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to create connection pool for PostgresConfig: {:?}",
e
)
})?;

Ok(Self {
config,
db_pool: conn_pool,
})
},
}
}
}

#[async_trait]
impl ProcessorTrait for DefaultProcessor {
fn name(&self) -> &'static str {
self.config.processor_config.name()
}

async fn run_processor(&self) -> Result<()> {
// Run migrations
match self.config.db_config {
DbConfig::PostgresConfig(ref postgres_config) => {
run_migrations(
postgres_config.connection_string.clone(),
self.db_pool.clone(),
)
.await;
},
}

// Merge the starting version from config and the latest processed version from the DB
let starting_version = get_starting_version(&self.config, self.db_pool.clone()).await?;

// Check and update the ledger chain id to ensure we're indexing the correct chain
let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone())
.await?
.get_chain_id()
.await?;
check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?;

let processor_config = match self.config.processor_config.clone() {
ProcessorConfig::DefaultProcessor(processor_config) => processor_config,
_ => {
return Err(anyhow::anyhow!(
"Invalid processor config for DefaultProcessor: {:?}",
self.config.processor_config
))
},
};
let channel_size = processor_config.channel_size;
let deprecated_table_flags = TableFlags::from_set(&processor_config.deprecated_tables);

// Define processor steps
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..self.config.transaction_stream_config.clone()
})
.await?;
let default_extractor = DefaultExtractor {
deprecated_table_flags,
};
let default_storer = DefaultStorer::new(self.db_pool.clone(), processor_config);
let version_tracker = VersionTrackerStep::new(
get_processor_status_saver(self.db_pool.clone(), self.config.clone()),
DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
);

// Connect processor steps together
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
.connect_to(default_extractor.into_runnable_step(), channel_size)
.connect_to(default_storer.into_runnable_step(), channel_size)
.connect_to(version_tracker.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

// (Optional) Parse the results
loop {
match buffer_receiver.recv().await {
Ok(txn_context) => {
debug!(
"Finished processing versions [{:?}, {:?}]",
txn_context.metadata.start_version, txn_context.metadata.end_version,
);
},
Err(e) => {
info!("No more transactions in channel: {:?}", e);
break Ok(());
},
}
}
}
}
1 change: 1 addition & 0 deletions rust/sdk-processor/src/processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod default_processor;
pub mod events_processor;
pub mod fungible_asset_processor;
pub mod token_v2_processor;
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use aptos_indexer_processor_sdk::{
aptos_protos::transaction::v1::Transaction,
traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable},
types::transaction_context::TransactionContext,
utils::errors::ProcessorError,
};
use async_trait::async_trait;
use processor::{
db::common::models::default_models::{
block_metadata_transactions::BlockMetadataTransactionModel,
move_tables::{CurrentTableItem, TableItem, TableMetadata},
},
processors::default_processor::process_transactions,
worker::TableFlags,
};
pub const MIN_TRANSACTIONS_PER_RAYON_JOB: usize = 64;

pub struct DefaultExtractor
where
Self: Sized + Send + 'static,
{
pub deprecated_table_flags: TableFlags,
}

#[async_trait]
impl Processable for DefaultExtractor {
type Input = Vec<Transaction>;
type Output = (
Vec<BlockMetadataTransactionModel>,
Vec<TableItem>,
Vec<CurrentTableItem>,
Vec<TableMetadata>,
);
type RunType = AsyncRunType;

async fn process(
&mut self,
transactions: TransactionContext<Vec<Transaction>>,
) -> Result<
Option<
TransactionContext<(
Vec<BlockMetadataTransactionModel>,
Vec<TableItem>,
Vec<CurrentTableItem>,
Vec<TableMetadata>,
)>,
>,
ProcessorError,
> {
let flags = self.deprecated_table_flags;
let (block_metadata_transactions, table_items, current_table_items, table_metadata) =
process_transactions(transactions.data, flags);

Ok(Some(TransactionContext {
data: (
block_metadata_transactions,
table_items,
current_table_items,
table_metadata,
),
metadata: transactions.metadata,
}))
}
}

impl AsyncStep for DefaultExtractor {}

impl NamedStep for DefaultExtractor {
fn name(&self) -> String {
"DefaultExtractor".to_string()
}
}
Loading

0 comments on commit 7b2873f

Please sign in to comment.