Skip to content

Commit

Permalink
add order step
Browse files Browse the repository at this point in the history
  • Loading branch information
rtso committed Oct 8, 2024
1 parent e4ef014 commit 8defcfd
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 59 deletions.
15 changes: 10 additions & 5 deletions rust/sdk-processor/src/processors/events_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use crate::{
processor_config::ProcessorConfig,
},
steps::{
common::latest_processed_version_tracker::LatestVersionProcessedTracker,
common::latest_processed_version_tracker::{
LatestVersionProcessedTracker, UPDATE_PROCESSOR_STATUS_SECS,
},
events_processor::{EventsExtractor, EventsStorer},
},
utils::{
Expand All @@ -17,9 +19,10 @@ use anyhow::Result;
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig},
builder::ProcessorBuilder,
common_steps::TransactionStreamStep,
common_steps::{OrderByVersionStep, TransactionStreamStep},
traits::IntoRunnableStep,
};
use std::time::Duration;
use tracing::{debug, info};

pub struct EventsProcessor {
Expand Down Expand Up @@ -94,18 +97,20 @@ impl EventsProcessor {
.await?;
let events_extractor = EventsExtractor {};
let events_storer = EventsStorer::new(self.db_pool.clone(), processor_config);
let version_tracker = LatestVersionProcessedTracker::new(
self.db_pool.clone(),
let order_step = OrderByVersionStep::new(
starting_version,
processor_name.to_string(),
Duration::from_secs(UPDATE_PROCESSOR_STATUS_SECS),
);
let version_tracker =
LatestVersionProcessedTracker::new(self.db_pool.clone(), processor_name.to_string());

// Connect processor steps together
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
.connect_to(events_extractor.into_runnable_step(), channel_size)
.connect_to(events_storer.into_runnable_step(), channel_size)
.connect_to(order_step.into_runnable_step(), channel_size)
.connect_to(version_tracker.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

Expand Down
15 changes: 10 additions & 5 deletions rust/sdk-processor/src/processors/fungible_asset_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use crate::{
processor_config::ProcessorConfig,
},
steps::{
common::latest_processed_version_tracker::LatestVersionProcessedTracker,
common::latest_processed_version_tracker::{
LatestVersionProcessedTracker, UPDATE_PROCESSOR_STATUS_SECS,
},
fungible_asset_processor::{
fungible_asset_extractor::FungibleAssetExtractor,
fungible_asset_storer::FungibleAssetStorer,
Expand All @@ -20,10 +22,11 @@ use anyhow::Result;
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig},
builder::ProcessorBuilder,
common_steps::TransactionStreamStep,
common_steps::{OrderByVersionStep, TransactionStreamStep},
traits::IntoRunnableStep,
};
use processor::worker::TableFlags;
use std::time::Duration;
use tracing::{debug, info};

pub struct FungibleAssetProcessor {
Expand Down Expand Up @@ -98,18 +101,20 @@ impl FungibleAssetProcessor {
processor_config,
deprecated_table_flags,
);
let version_tracker = LatestVersionProcessedTracker::new(
self.db_pool.clone(),
let order_step = OrderByVersionStep::new(
starting_version,
processor_name.to_string(),
Duration::from_secs(UPDATE_PROCESSOR_STATUS_SECS),
);
let version_tracker =
LatestVersionProcessedTracker::new(self.db_pool.clone(), processor_name.to_string());

// Connect processor steps together
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
.connect_to(fa_extractor.into_runnable_step(), channel_size)
.connect_to(fa_storer.into_runnable_step(), channel_size)
.connect_to(order_step.into_runnable_step(), channel_size)
.connect_to(version_tracker.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::utils::database::{execute_with_better_error, ArcDbPool};
use ahash::AHashMap;
use anyhow::Result;
use aptos_indexer_processor_sdk::{
traits::{
Expand All @@ -12,9 +11,8 @@ use async_trait::async_trait;
use diesel::{upsert::excluded, ExpressionMethods};
use processor::{db::common::models::processor_status::ProcessorStatus, schema::processor_status};
use std::marker::PhantomData;
use tracing::info;

const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1;
pub const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1;

pub struct LatestVersionProcessedTracker<T>
where
Expand All @@ -23,12 +21,8 @@ where
{
conn_pool: ArcDbPool,
tracker_name: String,
// Next version to process that we expect.
next_version: u64,
// Last successful batch of sequentially processed transactions. Includes metadata to write to storage.
last_success_batch: Option<TransactionContext<()>>,
// Tracks all the versions that have been processed out of order.
seen_versions: AHashMap<u64, TransactionContext<()>>,
_marker: PhantomData<T>,
}

Expand All @@ -37,30 +31,15 @@ where
Self: Sized + Send + 'static,
T: Send + 'static,
{
pub fn new(conn_pool: ArcDbPool, starting_version: u64, tracker_name: String) -> Self {
pub fn new(conn_pool: ArcDbPool, tracker_name: String) -> Self {
Self {
conn_pool,
tracker_name,
next_version: starting_version,
last_success_batch: None,
seen_versions: AHashMap::new(),
_marker: PhantomData,
}
}

fn update_last_success_batch(&mut self, current_batch: TransactionContext<()>) {
let mut new_prev_batch = current_batch;
// While there are batches in seen_versions that are in order, update the new_prev_batch to the next batch.
while let Some(next_version) = self
.seen_versions
.remove(&(new_prev_batch.metadata.end_version + 1))
{
new_prev_batch = next_version;
}
self.next_version = new_prev_batch.metadata.end_version + 1;
self.last_success_batch = Some(new_prev_batch);
}

async fn save_processor_status(&mut self) -> Result<(), ProcessorError> {
// Update the processor status
if let Some(last_success_batch) = self.last_success_batch.as_ref() {
Expand Down Expand Up @@ -113,34 +92,24 @@ where
&mut self,
current_batch: TransactionContext<T>,
) -> Result<Option<TransactionContext<T>>, ProcessorError> {
// info!(
// start_version = current_batch.start_version,
// end_version = current_batch.end_version,
// step_name = self.name(),
// "Processing versions"
// );
// If there's a gap in the next_version and current_version, save the current_version to seen_versions for
// later processing.
if self.next_version != current_batch.metadata.start_version {
info!(
expected_next_version = self.next_version,
step = self.name(),
batch_version = current_batch.metadata.start_version,
"Gap detected",
);
self.seen_versions
.insert(current_batch.metadata.start_version, TransactionContext {
data: (), // No data is needed for tracking.
metadata: current_batch.metadata.clone(),
// If there's a gap in version, return an error
if let Some(last_success_batch) = self.last_success_batch.as_ref() {
if last_success_batch.metadata.end_version + 1 != current_batch.metadata.start_version {
return Err(ProcessorError::ProcessError {
message: format!(
"Gap detected starting from version: {}",
current_batch.metadata.start_version
),
});
} else {
// info!("No gap detected");
// If the current_batch is the next expected version, update the last success batch
self.update_last_success_batch(TransactionContext {
data: (), // No data is needed for tracking.
metadata: current_batch.metadata.clone(),
});
}
}

// Update the last success batch
self.last_success_batch = Some(TransactionContext {
data: (),
metadata: current_batch.metadata.clone(),
});

// Pass through
Ok(Some(current_batch))
}
Expand Down

0 comments on commit 8defcfd

Please sign in to comment.