From 07482c2a16165bde3f82717ba7e95fe850938e6a Mon Sep 17 00:00:00 2001 From: rtso <8248583+rtso@users.noreply.github.com> Date: Mon, 7 Oct 2024 19:52:08 -0400 Subject: [PATCH] add order step --- .../src/processors/events_processor.rs | 15 +++-- .../processors/fungible_asset_processor.rs | 15 +++-- .../latest_processed_version_tracker.rs | 67 +++++-------------- 3 files changed, 38 insertions(+), 59 deletions(-) diff --git a/rust/sdk-processor/src/processors/events_processor.rs b/rust/sdk-processor/src/processors/events_processor.rs index 589eae03b..115ffff06 100644 --- a/rust/sdk-processor/src/processors/events_processor.rs +++ b/rust/sdk-processor/src/processors/events_processor.rs @@ -4,7 +4,9 @@ use crate::{ processor_config::ProcessorConfig, }, steps::{ - common::latest_processed_version_tracker::LatestVersionProcessedTracker, + common::latest_processed_version_tracker::{ + LatestVersionProcessedTracker, UPDATE_PROCESSOR_STATUS_SECS, + }, events_processor::{EventsExtractor, EventsStorer}, }, utils::{ @@ -17,9 +19,10 @@ use anyhow::Result; use aptos_indexer_processor_sdk::{ aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, builder::ProcessorBuilder, - common_steps::TransactionStreamStep, + common_steps::{OrderByVersionStep, TransactionStreamStep}, traits::IntoRunnableStep, }; +use std::time::Duration; use tracing::{debug, info}; pub struct EventsProcessor { @@ -94,11 +97,12 @@ impl EventsProcessor { .await?; let events_extractor = EventsExtractor {}; let events_storer = EventsStorer::new(self.db_pool.clone(), processor_config); - let version_tracker = LatestVersionProcessedTracker::new( - self.db_pool.clone(), + let order_step = OrderByVersionStep::new( starting_version, - processor_name.to_string(), + Duration::from_secs(UPDATE_PROCESSOR_STATUS_SECS), ); + let version_tracker = + LatestVersionProcessedTracker::new(self.db_pool.clone(), processor_name.to_string()); // Connect processor steps together let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( @@ -106,6 +110,7 @@ impl EventsProcessor { ) .connect_to(events_extractor.into_runnable_step(), channel_size) .connect_to(events_storer.into_runnable_step(), channel_size) + .connect_to(order_step.into_runnable_step(), channel_size) .connect_to(version_tracker.into_runnable_step(), channel_size) .end_and_return_output_receiver(channel_size); diff --git a/rust/sdk-processor/src/processors/fungible_asset_processor.rs b/rust/sdk-processor/src/processors/fungible_asset_processor.rs index 251766a0b..1351fd07a 100644 --- a/rust/sdk-processor/src/processors/fungible_asset_processor.rs +++ b/rust/sdk-processor/src/processors/fungible_asset_processor.rs @@ -4,7 +4,9 @@ use crate::{ processor_config::ProcessorConfig, }, steps::{ - common::latest_processed_version_tracker::LatestVersionProcessedTracker, + common::latest_processed_version_tracker::{ + LatestVersionProcessedTracker, UPDATE_PROCESSOR_STATUS_SECS, + }, fungible_asset_processor::{ fungible_asset_extractor::FungibleAssetExtractor, fungible_asset_storer::FungibleAssetStorer, @@ -20,10 +22,11 @@ use anyhow::Result; use aptos_indexer_processor_sdk::{ aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, builder::ProcessorBuilder, - common_steps::TransactionStreamStep, + common_steps::{OrderByVersionStep, TransactionStreamStep}, traits::IntoRunnableStep, }; use processor::worker::TableFlags; +use std::time::Duration; use tracing::{debug, info}; pub struct FungibleAssetProcessor { @@ -98,11 +101,12 @@ impl FungibleAssetProcessor { processor_config, deprecated_table_flags, ); - let version_tracker = LatestVersionProcessedTracker::new( - self.db_pool.clone(), + let order_step = OrderByVersionStep::new( starting_version, - processor_name.to_string(), + Duration::from_secs(UPDATE_PROCESSOR_STATUS_SECS), ); + let version_tracker = + LatestVersionProcessedTracker::new(self.db_pool.clone(), processor_name.to_string()); // Connect processor steps together let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( @@ -110,6 +114,7 @@ impl FungibleAssetProcessor { ) .connect_to(fa_extractor.into_runnable_step(), channel_size) .connect_to(fa_storer.into_runnable_step(), channel_size) + .connect_to(order_step.into_runnable_step(), channel_size) .connect_to(version_tracker.into_runnable_step(), channel_size) .end_and_return_output_receiver(channel_size); diff --git a/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs b/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs index 2dc949918..46ab231e2 100644 --- a/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs +++ b/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs @@ -1,5 +1,4 @@ use crate::utils::database::{execute_with_better_error, ArcDbPool}; -use ahash::AHashMap; use anyhow::Result; use aptos_indexer_processor_sdk::{ traits::{ @@ -12,9 +11,8 @@ use async_trait::async_trait; use diesel::{upsert::excluded, ExpressionMethods}; use processor::{db::common::models::processor_status::ProcessorStatus, schema::processor_status}; use std::marker::PhantomData; -use tracing::info; -const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; +pub const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; pub struct LatestVersionProcessedTracker where @@ -23,12 +21,8 @@ where { conn_pool: ArcDbPool, tracker_name: String, - // Next version to process that we expect. - next_version: u64, // Last successful batch of sequentially processed transactions. Includes metadata to write to storage. last_success_batch: Option>, - // Tracks all the versions that have been processed out of order. - seen_versions: AHashMap>, _marker: PhantomData, } @@ -37,30 +31,15 @@ where Self: Sized + Send + 'static, T: Send + 'static, { - pub fn new(conn_pool: ArcDbPool, starting_version: u64, tracker_name: String) -> Self { + pub fn new(conn_pool: ArcDbPool, tracker_name: String) -> Self { Self { conn_pool, tracker_name, - next_version: starting_version, last_success_batch: None, - seen_versions: AHashMap::new(), _marker: PhantomData, } } - fn update_last_success_batch(&mut self, current_batch: TransactionContext<()>) { - let mut new_prev_batch = current_batch; - // While there are batches in seen_versions that are in order, update the new_prev_batch to the next batch. - while let Some(next_version) = self - .seen_versions - .remove(&(new_prev_batch.metadata.end_version + 1)) - { - new_prev_batch = next_version; - } - self.next_version = new_prev_batch.metadata.end_version + 1; - self.last_success_batch = Some(new_prev_batch); - } - async fn save_processor_status(&mut self) -> Result<(), ProcessorError> { // Update the processor status if let Some(last_success_batch) = self.last_success_batch.as_ref() { @@ -113,34 +92,24 @@ where &mut self, current_batch: TransactionContext, ) -> Result>, ProcessorError> { - // info!( - // start_version = current_batch.start_version, - // end_version = current_batch.end_version, - // step_name = self.name(), - // "Processing versions" - // ); - // If there's a gap in the next_version and current_version, save the current_version to seen_versions for - // later processing. - if self.next_version != current_batch.metadata.start_version { - info!( - expected_next_version = self.next_version, - step = self.name(), - batch_version = current_batch.metadata.start_version, - "Gap detected", - ); - self.seen_versions - .insert(current_batch.metadata.start_version, TransactionContext { - data: (), // No data is needed for tracking. - metadata: current_batch.metadata.clone(), + // If there's a gap in version, return an error + if let Some(last_success_batch) = self.last_success_batch.as_ref() { + if last_success_batch.metadata.end_version + 1 != current_batch.metadata.start_version { + return Err(ProcessorError::ProcessError { + message: format!( + "Gap detected starting from version: {}", + current_batch.metadata.start_version + ), }); - } else { - // info!("No gap detected"); - // If the current_batch is the next expected version, update the last success batch - self.update_last_success_batch(TransactionContext { - data: (), // No data is needed for tracking. - metadata: current_batch.metadata.clone(), - }); + } } + + // Update the last success batch + self.last_success_batch = Some(TransactionContext { + data: (), + metadata: current_batch.metadata.clone(), + }); + // Pass through Ok(Some(current_batch)) }