diff --git a/rust/sdk-processor/src/processors/events_processor.rs b/rust/sdk-processor/src/processors/events_processor.rs index e44c5c692..7306264cf 100644 --- a/rust/sdk-processor/src/processors/events_processor.rs +++ b/rust/sdk-processor/src/processors/events_processor.rs @@ -4,7 +4,9 @@ use crate::{ processor_config::ProcessorConfig, }, steps::{ - common::latest_processed_version_tracker::LatestVersionProcessedTracker, + common::latest_processed_version_tracker::{ + LatestVersionProcessedTracker, UPDATE_PROCESSOR_STATUS_SECS, + }, events_processor::{EventsExtractor, EventsStorer}, }, utils::{ @@ -18,10 +20,11 @@ use anyhow::Result; use aptos_indexer_processor_sdk::{ aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, builder::ProcessorBuilder, - common_steps::TransactionStreamStep, + common_steps::{OrderByVersionStep, TransactionStreamStep}, traits::IntoRunnableStep, }; use serde::{Deserialize, Serialize}; +use std::time::Duration; use tracing::{debug, info}; #[derive(Clone, Debug, Deserialize, Serialize)] @@ -113,11 +116,12 @@ impl EventsProcessor { .await?; let events_extractor = EventsExtractor {}; let events_storer = EventsStorer::new(self.db_pool.clone(), events_processor_config); - let version_tracker = LatestVersionProcessedTracker::new( - self.db_pool.clone(), + let order_step = OrderByVersionStep::new( starting_version, - processor_name.to_string(), + Duration::from_secs(UPDATE_PROCESSOR_STATUS_SECS), ); + let version_tracker = + LatestVersionProcessedTracker::new(self.db_pool.clone(), processor_name.to_string()); // Connect processor steps together let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( @@ -125,6 +129,7 @@ impl EventsProcessor { ) .connect_to(events_extractor.into_runnable_step(), channel_size) .connect_to(events_storer.into_runnable_step(), channel_size) + .connect_to(order_step.into_runnable_step(), channel_size) .connect_to(version_tracker.into_runnable_step(), channel_size) .end_and_return_output_receiver(channel_size); diff --git a/rust/sdk-processor/src/processors/fungible_asset_processor.rs b/rust/sdk-processor/src/processors/fungible_asset_processor.rs index 56952d83e..927359697 100644 --- a/rust/sdk-processor/src/processors/fungible_asset_processor.rs +++ b/rust/sdk-processor/src/processors/fungible_asset_processor.rs @@ -4,7 +4,9 @@ use crate::{ processor_config::ProcessorConfig, }, steps::{ - common::latest_processed_version_tracker::LatestVersionProcessedTracker, + common::latest_processed_version_tracker::{ + LatestVersionProcessedTracker, UPDATE_PROCESSOR_STATUS_SECS, + }, fungible_asset_processor::{ fungible_asset_extractor::FungibleAssetExtractor, fungible_asset_storer::FungibleAssetStorer, @@ -21,11 +23,12 @@ use anyhow::Result; use aptos_indexer_processor_sdk::{ aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, builder::ProcessorBuilder, - common_steps::TransactionStreamStep, + common_steps::{OrderByVersionStep, TransactionStreamStep}, traits::IntoRunnableStep, }; use processor::worker::TableFlags; use serde::{Deserialize, Serialize}; +use std::time::Duration; use tracing::{debug, info}; #[derive(Clone, Debug, Deserialize, Serialize)] @@ -117,11 +120,12 @@ impl FungibleAssetProcessor { let fa_extractor = FungibleAssetExtractor {}; let fa_storer = FungibleAssetStorer::new(self.db_pool.clone(), fa_config, self.deprecated_table_flags); - let version_tracker = LatestVersionProcessedTracker::new( - self.db_pool.clone(), + let order_step = OrderByVersionStep::new( starting_version, - processor_name.to_string(), + Duration::from_secs(UPDATE_PROCESSOR_STATUS_SECS), ); + let version_tracker = + LatestVersionProcessedTracker::new(self.db_pool.clone(), processor_name.to_string()); // Connect processor steps together let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( @@ -129,6 +133,7 @@ impl FungibleAssetProcessor { ) .connect_to(fa_extractor.into_runnable_step(), channel_size) .connect_to(fa_storer.into_runnable_step(), channel_size) + .connect_to(order_step.into_runnable_step(), channel_size) .connect_to(version_tracker.into_runnable_step(), channel_size) .end_and_return_output_receiver(channel_size); diff --git a/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs b/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs index 2dc949918..92df33abd 100644 --- a/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs +++ b/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs @@ -14,7 +14,7 @@ use processor::{db::common::models::processor_status::ProcessorStatus, schema::p use std::marker::PhantomData; use tracing::info; -const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; +pub const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; pub struct LatestVersionProcessedTracker where @@ -23,12 +23,8 @@ where { conn_pool: ArcDbPool, tracker_name: String, - // Next version to process that we expect. - next_version: u64, // Last successful batch of sequentially processed transactions. Includes metadata to write to storage. last_success_batch: Option>, - // Tracks all the versions that have been processed out of order. - seen_versions: AHashMap>, _marker: PhantomData, } @@ -37,30 +33,15 @@ where Self: Sized + Send + 'static, T: Send + 'static, { - pub fn new(conn_pool: ArcDbPool, starting_version: u64, tracker_name: String) -> Self { + pub fn new(conn_pool: ArcDbPool, tracker_name: String) -> Self { Self { conn_pool, tracker_name, - next_version: starting_version, last_success_batch: None, - seen_versions: AHashMap::new(), _marker: PhantomData, } } - fn update_last_success_batch(&mut self, current_batch: TransactionContext<()>) { - let mut new_prev_batch = current_batch; - // While there are batches in seen_versions that are in order, update the new_prev_batch to the next batch. - while let Some(next_version) = self - .seen_versions - .remove(&(new_prev_batch.metadata.end_version + 1)) - { - new_prev_batch = next_version; - } - self.next_version = new_prev_batch.metadata.end_version + 1; - self.last_success_batch = Some(new_prev_batch); - } - async fn save_processor_status(&mut self) -> Result<(), ProcessorError> { // Update the processor status if let Some(last_success_batch) = self.last_success_batch.as_ref() { @@ -113,34 +94,24 @@ where &mut self, current_batch: TransactionContext, ) -> Result>, ProcessorError> { - // info!( - // start_version = current_batch.start_version, - // end_version = current_batch.end_version, - // step_name = self.name(), - // "Processing versions" - // ); - // If there's a gap in the next_version and current_version, save the current_version to seen_versions for - // later processing. - if self.next_version != current_batch.metadata.start_version { - info!( - expected_next_version = self.next_version, - step = self.name(), - batch_version = current_batch.metadata.start_version, - "Gap detected", - ); - self.seen_versions - .insert(current_batch.metadata.start_version, TransactionContext { - data: (), // No data is needed for tracking. - metadata: current_batch.metadata.clone(), + // If there's a gap in version, return an error + if let Some(last_success_batch) = self.last_success_batch.as_ref() { + if last_success_batch.metadata.end_version + 1 != current_batch.metadata.start_version { + return Err(ProcessorError::ProcessError { + message: format!( + "Gap detected starting from version: {}", + current_batch.metadata.start_version + ), }); - } else { - // info!("No gap detected"); - // If the current_batch is the next expected version, update the last success batch - self.update_last_success_batch(TransactionContext { - data: (), // No data is needed for tracking. - metadata: current_batch.metadata.clone(), - }); + } } + + // Update the last success batch + self.last_success_batch = Some(TransactionContext { + data: (), + metadata: current_batch.metadata.clone(), + }); + // Pass through Ok(Some(current_batch)) }