From 7a3651e72cadb12626ba2c549d057f16bc4be6a3 Mon Sep 17 00:00:00 2001 From: dermanyang Date: Tue, 22 Oct 2024 16:17:01 -0700 Subject: [PATCH 1/4] Migrate events processor to use new version tracker impl --- rust/Cargo.lock | 1 + .../src/models/queryable_models.rs | 4 +- .../src/sdk_tests/events_processor_tests.rs | 1 + .../fungible_asset_processor_tests.rs | 1 + .../down.sql | 2 + .../up.sql | 11 + rust/processor/src/db/postgres/schema.rs | 17 +- rust/sdk-processor/Cargo.toml | 1 + .../src/config/indexer_processor_config.rs | 7 + .../models/backfill_processor_status.rs | 84 ++++++ .../sdk-processor/src/db/common/models/mod.rs | 1 + .../src/processors/events_processor.rs | 19 +- rust/sdk-processor/src/steps/common/mod.rs | 3 + .../steps/common/processor_status_saver.rs | 144 +++++++++ rust/sdk-processor/src/utils/database.rs | 41 +-- .../src/utils/starting_version.rs | 281 ++++++++++++++++-- 16 files changed, 559 insertions(+), 59 deletions(-) create mode 100644 rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/down.sql create mode 100644 rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/up.sql create mode 100644 rust/sdk-processor/src/db/common/models/backfill_processor_status.rs create mode 100644 rust/sdk-processor/src/steps/common/processor_status_saver.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index b02bc00d8..dab05a4b1 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -4183,6 +4183,7 @@ dependencies = [ "anyhow", "aptos-indexer-processor-sdk", "aptos-indexer-processor-sdk-server-framework", + "aptos-indexer-testing-framework", "async-trait", "bcs", "bigdecimal", diff --git a/rust/integration-tests/src/models/queryable_models.rs b/rust/integration-tests/src/models/queryable_models.rs index 5105665d5..de925a4ee 100644 --- a/rust/integration-tests/src/models/queryable_models.rs +++ b/rust/integration-tests/src/models/queryable_models.rs @@ -126,7 +126,7 @@ pub struct CurrentFungibleAssetBalance { // pub amount: Option, // Added amount field to match schema // pub last_transaction_version_v1: Option, // pub last_transaction_version_v2: Option, -// pub last_transaction_version: Option, // Added to match schema +// pub last_transaction_version: Option, // Added to match swchema // pub last_transaction_timestamp_v1: Option, // pub last_transaction_timestamp_v2: Option, // pub last_transaction_timestamp: Option, // Added to match schema @@ -149,8 +149,8 @@ pub struct FungibleAssetMetadataModel { pub supply_aggregator_table_handle_v1: Option, pub supply_aggregator_table_key_v1: Option, pub token_standard: String, - pub is_token_v2: Option, pub inserted_at: chrono::NaiveDateTime, + pub is_token_v2: Option, pub supply_v2: Option, pub maximum_v2: Option, } diff --git a/rust/integration-tests/src/sdk_tests/events_processor_tests.rs b/rust/integration-tests/src/sdk_tests/events_processor_tests.rs index 1ac79d334..413a6af4f 100644 --- a/rust/integration-tests/src/sdk_tests/events_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/events_processor_tests.rs @@ -34,6 +34,7 @@ pub fn setup_events_processor_config( processor_config, transaction_stream_config, db_config, + backfill_config: None, }, processor_name, ) diff --git a/rust/integration-tests/src/sdk_tests/fungible_asset_processor_tests.rs b/rust/integration-tests/src/sdk_tests/fungible_asset_processor_tests.rs index b44ebbcd9..42fd65c5f 100644 --- a/rust/integration-tests/src/sdk_tests/fungible_asset_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/fungible_asset_processor_tests.rs @@ -35,6 +35,7 @@ pub fn setup_fa_processor_config( processor_config, transaction_stream_config, db_config, + backfill_config: None, }, processor_name, ) diff --git a/rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/down.sql b/rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/down.sql new file mode 100644 index 000000000..88f75178c --- /dev/null +++ b/rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TABLE IF EXISTS backfill_processor_status; \ No newline at end of file diff --git a/rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/up.sql b/rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/up.sql new file mode 100644 index 000000000..7bcb084e8 --- /dev/null +++ b/rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/up.sql @@ -0,0 +1,11 @@ +-- Your SQL goes here +CREATE TABLE backfill_processor_status ( + backfill_alias VARCHAR(50) NOT NULL, + backfill_status VARCHAR(50) NOT NULL, + last_success_version BIGINT NOT NULL, + last_updated TIMESTAMP NOT NULL DEFAULT NOW(), + last_transaction_timestamp TIMESTAMP NULL, + backfill_start_version BIGINT NOT NULL, + backfill_end_version BIGINT NOT NULL, + PRIMARY KEY (backfill_alias) +); \ No newline at end of file diff --git a/rust/processor/src/db/postgres/schema.rs b/rust/processor/src/db/postgres/schema.rs index 42634a353..7117444d8 100644 --- a/rust/processor/src/db/postgres/schema.rs +++ b/rust/processor/src/db/postgres/schema.rs @@ -84,6 +84,20 @@ diesel::table! { } } +diesel::table! { + backfill_processor_status (backfill_alias) { + #[max_length = 50] + backfill_alias -> Varchar, + #[max_length = 50] + backfill_status -> Varchar, + last_success_version -> Int8, + last_updated -> Timestamp, + last_transaction_timestamp -> Nullable, + backfill_start_version -> Int8, + backfill_end_version -> Int8, + } +} + diesel::table! { block_metadata_transactions (version) { version -> Int8, @@ -837,8 +851,8 @@ diesel::table! { supply_aggregator_table_key_v1 -> Nullable, #[max_length = 10] token_standard -> Varchar, - is_token_v2 -> Nullable, inserted_at -> Timestamp, + is_token_v2 -> Nullable, supply_v2 -> Nullable, maximum_v2 -> Nullable, } @@ -1285,6 +1299,7 @@ diesel::allow_tables_to_appear_in_same_query!( ans_lookup_v2, ans_primary_name, ans_primary_name_v2, + backfill_processor_status, block_metadata_transactions, coin_activities, coin_balances, diff --git a/rust/sdk-processor/Cargo.toml b/rust/sdk-processor/Cargo.toml index 620336f3b..7a3e8c1bf 100644 --- a/rust/sdk-processor/Cargo.toml +++ b/rust/sdk-processor/Cargo.toml @@ -9,6 +9,7 @@ ahash = { workspace = true } anyhow = { workspace = true } aptos-indexer-processor-sdk = { workspace = true } aptos-indexer-processor-sdk-server-framework = { workspace = true } +aptos-indexer-testing-framework = { workspace = true} async-trait = { workspace = true } bcs = { workspace = true } bigdecimal = { workspace = true } diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index 6b3921613..a8fa30c2d 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -19,6 +19,7 @@ pub struct IndexerProcessorConfig { pub processor_config: ProcessorConfig, pub transaction_stream_config: TransactionStreamConfig, pub db_config: DbConfig, + pub backfill_config: Option, } #[async_trait::async_trait] @@ -47,3 +48,9 @@ impl RunnableConfig for IndexerProcessorConfig { before_underscore[..before_underscore.len().min(12)].to_string() } } + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct BackfillConfig { + pub backfill_alias: String, +} diff --git a/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs b/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs new file mode 100644 index 000000000..8670f78cf --- /dev/null +++ b/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs @@ -0,0 +1,84 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +#![allow(clippy::extra_unused_lifetimes)] + +use crate::utils::database::DbPoolConnection; +use diesel::deserialize; +use diesel::deserialize::{FromSql, FromSqlRow}; +use diesel::expression::AsExpression; +use diesel::pg::{Pg, PgValue}; +use diesel::serialize; +use diesel::serialize::{IsNull, Output, ToSql}; +use diesel::sql_types::Text; +use diesel::{AsChangeset, ExpressionMethods, Insertable, OptionalExtension, QueryDsl, Queryable}; +use diesel_async::RunQueryDsl; +use processor::schema::backfill_processor_status; +use std::io::Write; + +#[derive(Debug, PartialEq, FromSqlRow, AsExpression, Eq)] +#[diesel(sql_type = Text)] +pub enum BackfillStatus { + // #[diesel(rename = "in_progress")] + InProgress, + // #[diesel(rename = "complete")] + Complete, +} + +impl ToSql for BackfillStatus { + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> serialize::Result { + match *self { + BackfillStatus::InProgress => out.write_all(b"in_progress")?, + BackfillStatus::Complete => out.write_all(b"complete")?, + } + Ok(IsNull::No) + } +} + +impl FromSql for BackfillStatus { + fn from_sql(bytes: PgValue<'_>) -> deserialize::Result { + match bytes.as_bytes() { + b"in_progress" => Ok(BackfillStatus::InProgress), + b"complete" => Ok(BackfillStatus::Complete), + _ => Err("Unrecognized enum variant".into()), + } + } +} + +#[derive(AsChangeset, Debug, Insertable)] +#[diesel(table_name = backfill_processor_status)] +/// Only tracking the latest version successfully processed +pub struct BackfillProcessorStatus { + pub backfill_alias: String, + pub backfill_status: BackfillStatus, + pub last_success_version: i64, + pub last_transaction_timestamp: Option, + pub backfill_start_version: i64, + pub backfill_end_version: i64, +} + +#[derive(AsChangeset, Debug, Queryable)] +#[diesel(table_name = backfill_processor_status)] +/// Only tracking the latest version successfully processed +pub struct BackfillProcessorStatusQuery { + pub backfill_alias: String, + pub backfill_status: BackfillStatus, + pub last_success_version: i64, + pub last_updated: chrono::NaiveDateTime, + pub last_transaction_timestamp: Option, + pub backfill_start_version: i64, + pub backfill_end_version: i64, +} + +impl BackfillProcessorStatusQuery { + pub async fn get_by_processor( + backfill_alias: &str, + conn: &mut DbPoolConnection<'_>, + ) -> diesel::QueryResult> { + backfill_processor_status::table + .filter(backfill_processor_status::backfill_alias.eq(backfill_alias)) + .first::(conn) + .await + .optional() + } +} diff --git a/rust/sdk-processor/src/db/common/models/mod.rs b/rust/sdk-processor/src/db/common/models/mod.rs index 0ed33bb3c..5393144c3 100644 --- a/rust/sdk-processor/src/db/common/models/mod.rs +++ b/rust/sdk-processor/src/db/common/models/mod.rs @@ -1,2 +1,3 @@ +pub mod backfill_processor_status; pub mod events_models; pub mod processor_status; diff --git a/rust/sdk-processor/src/processors/events_processor.rs b/rust/sdk-processor/src/processors/events_processor.rs index 853f16da2..1da25f5a7 100644 --- a/rust/sdk-processor/src/processors/events_processor.rs +++ b/rust/sdk-processor/src/processors/events_processor.rs @@ -4,9 +4,7 @@ use crate::{ processor_config::ProcessorConfig, }, steps::{ - common::latest_processed_version_tracker::{ - LatestVersionProcessedTracker, UPDATE_PROCESSOR_STATUS_SECS, - }, + common::get_processor_status_saver, events_processor::{EventsExtractor, EventsStorer}, }, utils::{ @@ -19,7 +17,10 @@ use anyhow::Result; use aptos_indexer_processor_sdk::{ aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, builder::ProcessorBuilder, - common_steps::{OrderByVersionStep, TransactionStreamStep}, + common_steps::{ + OrderByVersionStep, TransactionStreamStep, VersionTrackerStep, + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + }, traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, }; use std::time::Duration; @@ -62,8 +63,6 @@ impl ProcessorTrait for EventsProcessor { } async fn run_processor(&self) -> Result<()> { - let processor_name = self.config.processor_config.name(); - // Run migrations match self.config.db_config { DbConfig::PostgresConfig(ref postgres_config) => { @@ -106,10 +105,12 @@ impl ProcessorTrait for EventsProcessor { let events_storer = EventsStorer::new(self.db_pool.clone(), processor_config); let order_step = OrderByVersionStep::new( starting_version, - Duration::from_secs(UPDATE_PROCESSOR_STATUS_SECS), + Duration::from_secs(DEFAULT_UPDATE_PROCESSOR_STATUS_SECS), + ); + let version_tracker = VersionTrackerStep::new( + get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); - let version_tracker = - LatestVersionProcessedTracker::new(self.db_pool.clone(), processor_name.to_string()); // Connect processor steps together let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( diff --git a/rust/sdk-processor/src/steps/common/mod.rs b/rust/sdk-processor/src/steps/common/mod.rs index b0479bbe3..91b358fd9 100644 --- a/rust/sdk-processor/src/steps/common/mod.rs +++ b/rust/sdk-processor/src/steps/common/mod.rs @@ -1 +1,4 @@ pub mod latest_processed_version_tracker; +pub mod processor_status_saver; + +pub use processor_status_saver::get_processor_status_saver; diff --git a/rust/sdk-processor/src/steps/common/processor_status_saver.rs b/rust/sdk-processor/src/steps/common/processor_status_saver.rs new file mode 100644 index 000000000..67cf5799b --- /dev/null +++ b/rust/sdk-processor/src/steps/common/processor_status_saver.rs @@ -0,0 +1,144 @@ +use crate::{ + config::indexer_processor_config::IndexerProcessorConfig, + db::common::models::{ + backfill_processor_status::{BackfillProcessorStatus, BackfillStatus}, + processor_status::ProcessorStatus, + }, + utils::database::{execute_with_better_error, ArcDbPool}, +}; +use anyhow::Result; +use aptos_indexer_processor_sdk::common_steps::ProcessorStatusSaver; +use aptos_indexer_processor_sdk::{ + types::transaction_context::TransactionContext, + utils::{errors::ProcessorError, time::parse_timestamp}, +}; +use async_trait::async_trait; +use diesel::{upsert::excluded, ExpressionMethods}; +use processor::{schema::backfill_processor_status, schema::processor_status}; + +pub fn get_processor_status_saver( + conn_pool: ArcDbPool, + config: IndexerProcessorConfig, +) -> ProcessorStatusSaverEnum { + if let Some(backfill_config) = config.backfill_config { + let txn_stream_cfg = config.transaction_stream_config; + let backfill_start_version = txn_stream_cfg.starting_version; + let backfill_end_version = txn_stream_cfg.request_ending_version; + let backfill_alias = backfill_config.backfill_alias.clone(); + ProcessorStatusSaverEnum::Backfill { + conn_pool, + backfill_alias, + backfill_start_version, + backfill_end_version, + } + } else { + let processor_name = config.processor_config.name().to_string(); + ProcessorStatusSaverEnum::Default { + conn_pool, + processor_name, + } + } +} + +pub enum ProcessorStatusSaverEnum { + Default { + conn_pool: ArcDbPool, + processor_name: String, + }, + Backfill { + conn_pool: ArcDbPool, + backfill_alias: String, + backfill_start_version: Option, + backfill_end_version: Option, + }, +} + +#[async_trait] +impl ProcessorStatusSaver for ProcessorStatusSaverEnum { + async fn save_processor_status( + &self, + last_success_batch: &TransactionContext<()>, + ) -> Result<(), ProcessorError> { + let end_timestamp = last_success_batch + .metadata + .end_transaction_timestamp + .as_ref() + .map(|t| parse_timestamp(t, last_success_batch.metadata.end_version as i64)) + .map(|t| t.naive_utc()); + match self { + ProcessorStatusSaverEnum::Default { + conn_pool, + processor_name, + } => { + let status = ProcessorStatus { + processor: processor_name.clone(), + last_success_version: last_success_batch.metadata.end_version as i64, + last_transaction_timestamp: end_timestamp, + }; + + // Save regular processor status to the database + execute_with_better_error( + conn_pool.clone(), + diesel::insert_into(processor_status::table) + .values(&status) + .on_conflict(processor_status::processor) + .do_update() + .set(( + processor_status::last_success_version + .eq(excluded(processor_status::last_success_version)), + processor_status::last_updated.eq(excluded(processor_status::last_updated)), + processor_status::last_transaction_timestamp + .eq(excluded(processor_status::last_transaction_timestamp)), + )), + Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "), + ) + .await?; + + Ok(()) + }, + ProcessorStatusSaverEnum::Backfill { + conn_pool, + backfill_alias, + backfill_start_version, + backfill_end_version, + } => { + let lst_success_version = last_success_batch.metadata.end_version as i64; + let backfill_status = if backfill_end_version.is_some_and(|backfill_end_version| { + lst_success_version >= backfill_end_version as i64 + }) { + BackfillStatus::Complete + } else { + BackfillStatus::InProgress + }; + let status = BackfillProcessorStatus { + backfill_alias: backfill_alias.clone(), + backfill_status, + last_success_version: lst_success_version, + last_transaction_timestamp: end_timestamp, + backfill_start_version: backfill_start_version.unwrap_or(0) as i64, + + backfill_end_version: backfill_end_version + .unwrap_or(last_success_batch.metadata.end_version) + as i64, + }; + execute_with_better_error( + conn_pool.clone(), + diesel::insert_into(backfill_processor_status::table) + .values(&status) + .on_conflict(backfill_processor_status::backfill_alias) + .do_update() + .set(( + backfill_processor_status::backfill_status.eq(excluded(backfill_processor_status::backfill_status)), + backfill_processor_status::last_success_version.eq(excluded(backfill_processor_status::last_success_version)), + backfill_processor_status::last_updated.eq(excluded(backfill_processor_status::last_updated)), + backfill_processor_status::last_transaction_timestamp.eq(excluded(backfill_processor_status::last_transaction_timestamp)), + backfill_processor_status::backfill_start_version.eq(excluded(backfill_processor_status::backfill_start_version)), + backfill_processor_status::backfill_end_version.eq(excluded(backfill_processor_status::backfill_end_version)), + )), + Some(" WHERE backfill_processor_status.last_success_version <= EXCLUDED.last_success_version "), + ).await?; + Ok(()) + }, + } + } +} diff --git a/rust/sdk-processor/src/utils/database.rs b/rust/sdk-processor/src/utils/database.rs index 38d01a128..8930ca09e 100644 --- a/rust/sdk-processor/src/utils/database.rs +++ b/rust/sdk-processor/src/utils/database.rs @@ -5,7 +5,7 @@ #![allow(clippy::extra_unused_lifetimes)] use ahash::AHashMap; -use aptos_indexer_processor_sdk::utils::convert::remove_null_bytes; +use aptos_indexer_processor_sdk::utils::{convert::remove_null_bytes, errors::ProcessorError}; use diesel::{ query_builder::{AstPass, Query, QueryFragment, QueryId}, ConnectionResult, QueryResult, @@ -20,6 +20,7 @@ use diesel_async::{ use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use futures_util::{future::BoxFuture, FutureExt}; use std::sync::Arc; +use tracing::{info, warn}; pub type Backend = diesel::pg::Pg; @@ -131,7 +132,7 @@ pub async fn execute_in_chunks( build_query: fn(Vec) -> (U, Option<&'static str>), items_to_insert: &[T], chunk_size: usize, -) -> Result<(), diesel::result::Error> +) -> Result<(), ProcessorError> where U: QueryFragment + diesel::query_builder::QueryId + Send + 'static, T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + 'static, @@ -163,7 +164,7 @@ pub async fn execute_with_better_error( pool: ArcDbPool, query: U, mut additional_where_clause: Option<&'static str>, -) -> QueryResult +) -> Result where U: QueryFragment + diesel::query_builder::QueryId + Send, { @@ -178,19 +179,23 @@ where where_clause: additional_where_clause, }; let debug_string = diesel::debug_query::(&final_query).to_string(); - tracing::debug!("Executing query: {:?}", debug_string); let conn = &mut pool.get().await.map_err(|e| { - tracing::warn!("Error getting connection from pool: {:?}", e); - diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::UnableToSendCommand, - Box::new(e.to_string()), - ) + warn!("Error getting connection from pool: {:?}", e); + ProcessorError::DBStoreError { + message: format!("{:#}", e), + query: Some(debug_string.clone()), + } })?; - let res = final_query.execute(conn).await; - if let Err(ref e) = res { - tracing::warn!("Error running query: {:?}\n{:?}", e, debug_string); - } - res + final_query + .execute(conn) + .await + .inspect_err(|e| { + warn!("Error running query: {:?}\n{:?}", e, debug_string); + }) + .map_err(|e| ProcessorError::DBStoreError { + message: format!("{:#}", e), + query: Some(debug_string), + }) } /// Returns the entry for the config hashmap, or the default field count for the insert. @@ -240,7 +245,7 @@ async fn execute_or_retry_cleaned( items: Vec, query: U, additional_where_clause: Option<&'static str>, -) -> Result<(), diesel::result::Error> +) -> Result<(), ProcessorError> where U: QueryFragment + diesel::query_builder::QueryId + Send, T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone, @@ -275,12 +280,12 @@ pub fn run_pending_migrations(conn: &mut impl Migr pub async fn run_migrations(postgres_connection_string: String, _conn_pool: ArcDbPool) { use diesel::{Connection, PgConnection}; - tracing::info!("Running migrations: {:?}", postgres_connection_string); + info!("Running migrations: {:?}", postgres_connection_string); let migration_time = std::time::Instant::now(); let mut conn = PgConnection::establish(&postgres_connection_string).expect("migrations failed!"); run_pending_migrations(&mut conn); - tracing::info!( + info!( duration_in_secs = migration_time.elapsed().as_secs_f64(), "[Parser] Finished migrations" ); @@ -292,7 +297,7 @@ pub async fn run_migrations(postgres_connection_string: String, _conn_pool: ArcD pub async fn run_migrations(postgres_connection_string: String, conn_pool: ArcDbPool) { use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; - tracing::info!("Running migrations: {:?}", postgres_connection_string); + info!("Running migrations: {:?}", postgres_connection_string); let conn = conn_pool // We need to use this since AsyncConnectionWrapper doesn't know how to // work with a pooled connection. diff --git a/rust/sdk-processor/src/utils/starting_version.rs b/rust/sdk-processor/src/utils/starting_version.rs index 9b87fd9d5..885855236 100644 --- a/rust/sdk-processor/src/utils/starting_version.rs +++ b/rust/sdk-processor/src/utils/starting_version.rs @@ -1,52 +1,275 @@ use super::database::ArcDbPool; -use crate::config::indexer_processor_config::IndexerProcessorConfig; +use crate::{ + config::indexer_processor_config::IndexerProcessorConfig, + db::common::{ + models::backfill_processor_status::{BackfillProcessorStatusQuery, BackfillStatus}, + models::processor_status::ProcessorStatusQuery, + }, +}; use anyhow::{Context, Result}; -use processor::db::common::models::processor_status::ProcessorStatusQuery; +/// Get the appropriate starting version for the processor. +/// +/// If it is a regular processor, this will return the higher of the checkpointed version, +/// or `staring_version` from the config, or 0 if not set. +/// +/// If this is a backfill processor and threre is an in-progress backfill, this will return +/// the checkpointed version + 1. +/// +/// If this is a backfill processor and there is not an in-progress backfill (i.e., no checkpoint or +/// backfill status is COMPLETE), this will return `starting_version` from the config, or 0 if not set. pub async fn get_starting_version( indexer_processor_config: &IndexerProcessorConfig, conn_pool: ArcDbPool, ) -> Result { - // If starting_version is set in TransactionStreamConfig, use that - if indexer_processor_config - .transaction_stream_config - .starting_version - .is_some() - { - return Ok(indexer_processor_config - .transaction_stream_config - .starting_version - .unwrap()); - } - - // If it's not set, check if the DB has latest_processed_version set and use that - let latest_processed_version_from_db = + // Check if there's a checkpoint in the approrpiate processor status table. + let latest_processed_version = get_latest_processed_version_from_db(indexer_processor_config, conn_pool) .await .context("Failed to get latest processed version from DB")?; - if let Some(latest_processed_version_tracker) = latest_processed_version_from_db { - // Add +1 to start from the version after the last successful version - return Ok(latest_processed_version_tracker + 1); - } - // If latest_processed_version is not stored in DB, return the default 0 - Ok(0) + // If nothing checkpointed, return the `starting_version` from the config, or 0 if not set. + Ok(latest_processed_version.unwrap_or( + indexer_processor_config + .transaction_stream_config + .starting_version + .unwrap_or(0), + )) } -/// Gets the start version for the processor. If not found, start from 0. -pub async fn get_latest_processed_version_from_db( +async fn get_latest_processed_version_from_db( indexer_processor_config: &IndexerProcessorConfig, conn_pool: ArcDbPool, ) -> Result> { let mut conn = conn_pool.get().await?; - match ProcessorStatusQuery::get_by_processor( + if let Some(backfill_config) = &indexer_processor_config.backfill_config { + let backfill_status = BackfillProcessorStatusQuery::get_by_processor( + &backfill_config.backfill_alias, + &mut conn, + ) + .await + .context("Failed to query backfill_processor_status table.")?; + + // Return None if there is no checkpoint or if the backfill is old (complete). + // Otherwise, return the checkpointed version + 1. + return Ok( + backfill_status.and_then(|status| match status.backfill_status { + BackfillStatus::InProgress => Some(status.last_success_version as u64 + 1), + // If status is Complete, this is the start of a new backfill job. + BackfillStatus::Complete => None, + }), + ); + } + + let status = ProcessorStatusQuery::get_by_processor( indexer_processor_config.processor_config.name(), &mut conn, ) - .await? - { - Some(status) => Ok(Some(status.last_success_version as u64)), - None => Ok(None), + .await + .context("Failed to query processor_status table.")?; + + // Return None if there is no checkpoint. Otherwise, + // return the higher of the checkpointed version + 1 and `starting_version`. + Ok(status.map(|status| { + std::cmp::max( + status.last_success_version as u64 + 1, + indexer_processor_config + .transaction_stream_config + .starting_version + .unwrap_or(0), + ) + })) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + config::{ + db_config::{DbConfig, PostgresConfig}, + indexer_processor_config::{BackfillConfig, IndexerProcessorConfig}, + processor_config::{DefaultProcessorConfig, ProcessorConfig}, + }, + db::common::{ + models::backfill_processor_status::{self, BackfillProcessorStatus, BackfillStatus}, + models::processor_status::ProcessorStatus, + }, + utils::database::{new_db_pool, run_migrations}, + }; + use ahash::AHashMap; + use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig; + use aptos_indexer_testing_framework::database::{PostgresTestDatabase, TestDatabase}; + use diesel_async::RunQueryDsl; + use processor::{db::common::models, schema::processor_status}; + use std::collections::HashSet; + use url::Url; + + fn create_indexer_config( + db_url: String, + backfill_config: Option, + starting_version: Option, + ) -> IndexerProcessorConfig { + let default_processor_config = DefaultProcessorConfig { + per_table_chunk_sizes: AHashMap::new(), + channel_size: 100, + deprecated_tables: HashSet::new(), + }; + let processor_config = ProcessorConfig::EventsProcessor(default_processor_config); + let postgres_config = PostgresConfig { + connection_string: db_url.to_string(), + db_pool_size: 100, + }; + let db_config = DbConfig::PostgresConfig(postgres_config); + return IndexerProcessorConfig { + db_config, + transaction_stream_config: TransactionStreamConfig { + indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), + starting_version, + request_ending_version: None, + auth_token: "test".to_string(), + request_name_header: "test".to_string(), + indexer_grpc_http2_ping_interval_secs: 1, + indexer_grpc_http2_ping_timeout_secs: 1, + indexer_grpc_reconnection_timeout_secs: 1, + indexer_grpc_response_item_timeout_secs: 1, + }, + processor_config, + backfill_config, + }; + } + + #[tokio::test] + async fn test_get_starting_version_no_checkpoint() { + let mut db = PostgresTestDatabase::new(); + db.setup().await.unwrap(); + let indexer_processor_config = create_indexer_config(db.get_db_url(), None, None); + let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10)) + .await + .expect("Failed to create connection pool"); + run_migrations(db.get_db_url(), conn_pool.clone()).await; + + let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + .await + .unwrap(); + + assert_eq!(starting_version, 0); + } + + #[tokio::test] + async fn test_get_starting_version_no_checkpoint_with_start_ver() { + let mut db = PostgresTestDatabase::new(); + db.setup().await.unwrap(); + let indexer_processor_config = create_indexer_config(db.get_db_url(), None, Some(5)); + let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10)) + .await + .expect("Failed to create connection pool"); + run_migrations(db.get_db_url(), conn_pool.clone()).await; + + let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + .await + .unwrap(); + + assert_eq!(starting_version, 5); + } + + // ::processor_status::ProcessorStatus, schema::processor_status + + #[tokio::test] + async fn test_get_starting_version_with_checkpoint() { + let mut db = PostgresTestDatabase::new(); + db.setup().await.unwrap(); + let indexer_processor_config = create_indexer_config(db.get_db_url(), None, None); + let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10)) + .await + .expect("Failed to create connection pool"); + run_migrations(db.get_db_url(), conn_pool.clone()).await; + diesel::insert_into(processor_status::table) + .values(ProcessorStatus { + processor: indexer_processor_config.processor_config.name().to_string(), + last_success_version: 10, + last_transaction_timestamp: None, + }) + .execute(&mut conn_pool.clone().get().await.unwrap()) + .await + .expect("Failed to insert processor status"); + + let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + .await + .unwrap(); + + assert_eq!(starting_version, 11); + } + + #[tokio::test] + async fn test_backfill_get_starting_version_with_completed_checkpoint() { + let mut db = PostgresTestDatabase::new(); + let backfill_alias = "backfill_processor".to_string(); + db.setup().await.unwrap(); + let indexer_processor_config = create_indexer_config( + db.get_db_url(), + Some(BackfillConfig { + backfill_alias: backfill_alias.clone(), + }), + None, + ); + let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10)) + .await + .expect("Failed to create connection pool"); + run_migrations(db.get_db_url(), conn_pool.clone()).await; + diesel::insert_into(processor::schema::backfill_processor_status::table) + .values(BackfillProcessorStatus { + backfill_alias: backfill_alias.clone(), + backfill_status: BackfillStatus::Complete, + last_success_version: 10, + last_transaction_timestamp: None, + backfill_start_version: 0, + backfill_end_version: 10, + }) + .execute(&mut conn_pool.clone().get().await.unwrap()) + .await + .expect("Failed to insert processor status"); + + let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + .await + .unwrap(); + + assert_eq!(starting_version, 0); + } + + #[tokio::test] + async fn test_backfill_get_starting_version_with_inprogress_checkpoint() { + let mut db = PostgresTestDatabase::new(); + let backfill_alias = "backfill_processor".to_string(); + db.setup().await.unwrap(); + let indexer_processor_config = create_indexer_config( + db.get_db_url(), + Some(BackfillConfig { + backfill_alias: backfill_alias.clone(), + }), + None, + ); + let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10)) + .await + .expect("Failed to create connection pool"); + run_migrations(db.get_db_url(), conn_pool.clone()).await; + diesel::insert_into(processor::schema::backfill_processor_status::table) + .values(BackfillProcessorStatus { + backfill_alias: backfill_alias.clone(), + backfill_status: BackfillStatus::InProgress, + last_success_version: 10, + last_transaction_timestamp: None, + backfill_start_version: 0, + backfill_end_version: 10, + }) + .execute(&mut conn_pool.clone().get().await.unwrap()) + .await + .expect("Failed to insert processor status"); + + let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + .await + .unwrap(); + + assert_eq!(starting_version, 11); } } From fe0f28d37c5f1412eb3c9e3785f3a30fae938ead Mon Sep 17 00:00:00 2001 From: dermanyang Date: Tue, 22 Oct 2024 16:34:06 -0700 Subject: [PATCH 2/4] Formatting --- rust/sdk-processor/Cargo.toml | 2 +- .../models/backfill_processor_status.rs | 18 +++++----- .../steps/common/processor_status_saver.rs | 4 +-- .../src/utils/starting_version.rs | 35 ++++++++++--------- 4 files changed, 32 insertions(+), 27 deletions(-) diff --git a/rust/sdk-processor/Cargo.toml b/rust/sdk-processor/Cargo.toml index 7a3e8c1bf..6859a36df 100644 --- a/rust/sdk-processor/Cargo.toml +++ b/rust/sdk-processor/Cargo.toml @@ -9,7 +9,7 @@ ahash = { workspace = true } anyhow = { workspace = true } aptos-indexer-processor-sdk = { workspace = true } aptos-indexer-processor-sdk-server-framework = { workspace = true } -aptos-indexer-testing-framework = { workspace = true} +aptos-indexer-testing-framework = { workspace = true } async-trait = { workspace = true } bcs = { workspace = true } bigdecimal = { workspace = true } diff --git a/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs b/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs index 8670f78cf..7c3d8d3bc 100644 --- a/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs +++ b/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs @@ -4,14 +4,16 @@ #![allow(clippy::extra_unused_lifetimes)] use crate::utils::database::DbPoolConnection; -use diesel::deserialize; -use diesel::deserialize::{FromSql, FromSqlRow}; -use diesel::expression::AsExpression; -use diesel::pg::{Pg, PgValue}; -use diesel::serialize; -use diesel::serialize::{IsNull, Output, ToSql}; -use diesel::sql_types::Text; -use diesel::{AsChangeset, ExpressionMethods, Insertable, OptionalExtension, QueryDsl, Queryable}; +use diesel::{ + deserialize, + deserialize::{FromSql, FromSqlRow}, + expression::AsExpression, + pg::{Pg, PgValue}, + serialize, + serialize::{IsNull, Output, ToSql}, + sql_types::Text, + AsChangeset, ExpressionMethods, Insertable, OptionalExtension, QueryDsl, Queryable, +}; use diesel_async::RunQueryDsl; use processor::schema::backfill_processor_status; use std::io::Write; diff --git a/rust/sdk-processor/src/steps/common/processor_status_saver.rs b/rust/sdk-processor/src/steps/common/processor_status_saver.rs index 67cf5799b..fadc98258 100644 --- a/rust/sdk-processor/src/steps/common/processor_status_saver.rs +++ b/rust/sdk-processor/src/steps/common/processor_status_saver.rs @@ -7,14 +7,14 @@ use crate::{ utils::database::{execute_with_better_error, ArcDbPool}, }; use anyhow::Result; -use aptos_indexer_processor_sdk::common_steps::ProcessorStatusSaver; use aptos_indexer_processor_sdk::{ + common_steps::ProcessorStatusSaver, types::transaction_context::TransactionContext, utils::{errors::ProcessorError, time::parse_timestamp}, }; use async_trait::async_trait; use diesel::{upsert::excluded, ExpressionMethods}; -use processor::{schema::backfill_processor_status, schema::processor_status}; +use processor::schema::{backfill_processor_status, processor_status}; pub fn get_processor_status_saver( conn_pool: ArcDbPool, diff --git a/rust/sdk-processor/src/utils/starting_version.rs b/rust/sdk-processor/src/utils/starting_version.rs index 885855236..b765767ff 100644 --- a/rust/sdk-processor/src/utils/starting_version.rs +++ b/rust/sdk-processor/src/utils/starting_version.rs @@ -1,9 +1,9 @@ use super::database::ArcDbPool; use crate::{ config::indexer_processor_config::IndexerProcessorConfig, - db::common::{ - models::backfill_processor_status::{BackfillProcessorStatusQuery, BackfillStatus}, - models::processor_status::ProcessorStatusQuery, + db::common::models::{ + backfill_processor_status::{BackfillProcessorStatusQuery, BackfillStatus}, + processor_status::ProcessorStatusQuery, }, }; use anyhow::{Context, Result}; @@ -91,9 +91,9 @@ mod tests { indexer_processor_config::{BackfillConfig, IndexerProcessorConfig}, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }, - db::common::{ - models::backfill_processor_status::{self, BackfillProcessorStatus, BackfillStatus}, - models::processor_status::ProcessorStatus, + db::common::models::{ + backfill_processor_status::{BackfillProcessorStatus, BackfillStatus}, + processor_status::ProcessorStatus, }, utils::database::{new_db_pool, run_migrations}, }; @@ -101,7 +101,7 @@ mod tests { use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig; use aptos_indexer_testing_framework::database::{PostgresTestDatabase, TestDatabase}; use diesel_async::RunQueryDsl; - use processor::{db::common::models, schema::processor_status}; + use processor::schema::processor_status; use std::collections::HashSet; use url::Url; @@ -121,7 +121,7 @@ mod tests { db_pool_size: 100, }; let db_config = DbConfig::PostgresConfig(postgres_config); - return IndexerProcessorConfig { + IndexerProcessorConfig { db_config, transaction_stream_config: TransactionStreamConfig { indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), @@ -136,10 +136,11 @@ mod tests { }, processor_config, backfill_config, - }; + } } #[tokio::test] + #[allow(clippy::needless_return)] async fn test_get_starting_version_no_checkpoint() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); @@ -153,10 +154,11 @@ mod tests { .await .unwrap(); - assert_eq!(starting_version, 0); + assert_eq!(starting_version, 0) } #[tokio::test] + #[allow(clippy::needless_return)] async fn test_get_starting_version_no_checkpoint_with_start_ver() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); @@ -170,12 +172,11 @@ mod tests { .await .unwrap(); - assert_eq!(starting_version, 5); + assert_eq!(starting_version, 5) } - // ::processor_status::ProcessorStatus, schema::processor_status - #[tokio::test] + #[allow(clippy::needless_return)] async fn test_get_starting_version_with_checkpoint() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); @@ -198,10 +199,11 @@ mod tests { .await .unwrap(); - assert_eq!(starting_version, 11); + assert_eq!(starting_version, 11) } #[tokio::test] + #[allow(clippy::needless_return)] async fn test_backfill_get_starting_version_with_completed_checkpoint() { let mut db = PostgresTestDatabase::new(); let backfill_alias = "backfill_processor".to_string(); @@ -234,10 +236,11 @@ mod tests { .await .unwrap(); - assert_eq!(starting_version, 0); + assert_eq!(starting_version, 0) } #[tokio::test] + #[allow(clippy::needless_return)] async fn test_backfill_get_starting_version_with_inprogress_checkpoint() { let mut db = PostgresTestDatabase::new(); let backfill_alias = "backfill_processor".to_string(); @@ -270,6 +273,6 @@ mod tests { .await .unwrap(); - assert_eq!(starting_version, 11); + assert_eq!(starting_version, 11) } } From ba8df18829bbc337c985330dbaa52ee58f530aaf Mon Sep 17 00:00:00 2001 From: dermanyang Date: Tue, 22 Oct 2024 17:30:31 -0700 Subject: [PATCH 3/4] Undo typo --- rust/integration-tests/src/models/queryable_models.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/integration-tests/src/models/queryable_models.rs b/rust/integration-tests/src/models/queryable_models.rs index de925a4ee..eccb4be30 100644 --- a/rust/integration-tests/src/models/queryable_models.rs +++ b/rust/integration-tests/src/models/queryable_models.rs @@ -126,7 +126,7 @@ pub struct CurrentFungibleAssetBalance { // pub amount: Option, // Added amount field to match schema // pub last_transaction_version_v1: Option, // pub last_transaction_version_v2: Option, -// pub last_transaction_version: Option, // Added to match swchema +// pub last_transaction_version: Option, // Added to match schema // pub last_transaction_timestamp_v1: Option, // pub last_transaction_timestamp_v2: Option, // pub last_transaction_timestamp: Option, // Added to match schema From f4ee55460a78c28cdf6249f302d6661c0bb08872 Mon Sep 17 00:00:00 2001 From: dermanyang Date: Wed, 23 Oct 2024 14:00:28 -0700 Subject: [PATCH 4/4] Changes from comments --- .../up.sql | 2 +- rust/processor/src/db/postgres/schema.rs | 2 +- .../models/backfill_processor_status.rs | 11 ++++++---- .../src/processors/events_processor.rs | 9 +------- .../processors/fungible_asset_processor.rs | 21 +++++++------------ .../steps/common/processor_status_saver.rs | 6 ++---- .../src/utils/starting_version.rs | 8 +++---- 7 files changed, 23 insertions(+), 36 deletions(-) diff --git a/rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/up.sql b/rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/up.sql index 7bcb084e8..9db3e4b1e 100644 --- a/rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/up.sql +++ b/rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/up.sql @@ -6,6 +6,6 @@ CREATE TABLE backfill_processor_status ( last_updated TIMESTAMP NOT NULL DEFAULT NOW(), last_transaction_timestamp TIMESTAMP NULL, backfill_start_version BIGINT NOT NULL, - backfill_end_version BIGINT NOT NULL, + backfill_end_version BIGINT NULL, PRIMARY KEY (backfill_alias) ); \ No newline at end of file diff --git a/rust/processor/src/db/postgres/schema.rs b/rust/processor/src/db/postgres/schema.rs index 7117444d8..344e3ad19 100644 --- a/rust/processor/src/db/postgres/schema.rs +++ b/rust/processor/src/db/postgres/schema.rs @@ -94,7 +94,7 @@ diesel::table! { last_updated -> Timestamp, last_transaction_timestamp -> Nullable, backfill_start_version -> Int8, - backfill_end_version -> Int8, + backfill_end_version -> Nullable, } } diff --git a/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs b/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs index 7c3d8d3bc..2af058d91 100644 --- a/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs +++ b/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs @@ -18,6 +18,9 @@ use diesel_async::RunQueryDsl; use processor::schema::backfill_processor_status; use std::io::Write; +const IN_PROGRESS: &[u8] = b"in_progress"; +const COMPLETE: &[u8] = b"complete"; + #[derive(Debug, PartialEq, FromSqlRow, AsExpression, Eq)] #[diesel(sql_type = Text)] pub enum BackfillStatus { @@ -30,8 +33,8 @@ pub enum BackfillStatus { impl ToSql for BackfillStatus { fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> serialize::Result { match *self { - BackfillStatus::InProgress => out.write_all(b"in_progress")?, - BackfillStatus::Complete => out.write_all(b"complete")?, + BackfillStatus::InProgress => out.write_all(IN_PROGRESS)?, + BackfillStatus::Complete => out.write_all(COMPLETE)?, } Ok(IsNull::No) } @@ -56,7 +59,7 @@ pub struct BackfillProcessorStatus { pub last_success_version: i64, pub last_transaction_timestamp: Option, pub backfill_start_version: i64, - pub backfill_end_version: i64, + pub backfill_end_version: Option, } #[derive(AsChangeset, Debug, Queryable)] @@ -69,7 +72,7 @@ pub struct BackfillProcessorStatusQuery { pub last_updated: chrono::NaiveDateTime, pub last_transaction_timestamp: Option, pub backfill_start_version: i64, - pub backfill_end_version: i64, + pub backfill_end_version: Option, } impl BackfillProcessorStatusQuery { diff --git a/rust/sdk-processor/src/processors/events_processor.rs b/rust/sdk-processor/src/processors/events_processor.rs index 1da25f5a7..124ed0eb9 100644 --- a/rust/sdk-processor/src/processors/events_processor.rs +++ b/rust/sdk-processor/src/processors/events_processor.rs @@ -18,12 +18,10 @@ use aptos_indexer_processor_sdk::{ aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, builder::ProcessorBuilder, common_steps::{ - OrderByVersionStep, TransactionStreamStep, VersionTrackerStep, - DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + TransactionStreamStep, VersionTrackerStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, }, traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, }; -use std::time::Duration; use tracing::{debug, info}; pub struct EventsProcessor { @@ -103,10 +101,6 @@ impl ProcessorTrait for EventsProcessor { .await?; let events_extractor = EventsExtractor {}; let events_storer = EventsStorer::new(self.db_pool.clone(), processor_config); - let order_step = OrderByVersionStep::new( - starting_version, - Duration::from_secs(DEFAULT_UPDATE_PROCESSOR_STATUS_SECS), - ); let version_tracker = VersionTrackerStep::new( get_processor_status_saver(self.db_pool.clone(), self.config.clone()), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, @@ -118,7 +112,6 @@ impl ProcessorTrait for EventsProcessor { ) .connect_to(events_extractor.into_runnable_step(), channel_size) .connect_to(events_storer.into_runnable_step(), channel_size) - .connect_to(order_step.into_runnable_step(), channel_size) .connect_to(version_tracker.into_runnable_step(), channel_size) .end_and_return_output_receiver(channel_size); diff --git a/rust/sdk-processor/src/processors/fungible_asset_processor.rs b/rust/sdk-processor/src/processors/fungible_asset_processor.rs index 8044bf88b..a7c16779d 100644 --- a/rust/sdk-processor/src/processors/fungible_asset_processor.rs +++ b/rust/sdk-processor/src/processors/fungible_asset_processor.rs @@ -4,9 +4,7 @@ use crate::{ processor_config::ProcessorConfig, }, steps::{ - common::latest_processed_version_tracker::{ - LatestVersionProcessedTracker, UPDATE_PROCESSOR_STATUS_SECS, - }, + common::get_processor_status_saver, fungible_asset_processor::{ fungible_asset_extractor::FungibleAssetExtractor, fungible_asset_storer::FungibleAssetStorer, @@ -22,11 +20,12 @@ use anyhow::Result; use aptos_indexer_processor_sdk::{ aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, builder::ProcessorBuilder, - common_steps::{OrderByVersionStep, TransactionStreamStep}, + common_steps::{ + TransactionStreamStep, VersionTrackerStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + }, traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, }; use processor::worker::TableFlags; -use std::time::Duration; use tracing::{debug, info}; pub struct FungibleAssetProcessor { @@ -66,8 +65,6 @@ impl ProcessorTrait for FungibleAssetProcessor { } async fn run_processor(&self) -> Result<()> { - let processor_name = self.config.processor_config.name(); - // Run migrations match self.config.db_config { DbConfig::PostgresConfig(ref postgres_config) => { @@ -108,20 +105,16 @@ impl ProcessorTrait for FungibleAssetProcessor { processor_config.clone(), deprecated_table_flags, ); - let order_step = OrderByVersionStep::new( - starting_version, - Duration::from_secs(UPDATE_PROCESSOR_STATUS_SECS), + let version_tracker = VersionTrackerStep::new( + get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); - let version_tracker = - LatestVersionProcessedTracker::new(self.db_pool.clone(), processor_name.to_string()); - // Connect processor steps together let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( transaction_stream.into_runnable_step(), ) .connect_to(fa_extractor.into_runnable_step(), channel_size) .connect_to(fa_storer.into_runnable_step(), channel_size) - .connect_to(order_step.into_runnable_step(), channel_size) .connect_to(version_tracker.into_runnable_step(), channel_size) .end_and_return_output_receiver(channel_size); diff --git a/rust/sdk-processor/src/steps/common/processor_status_saver.rs b/rust/sdk-processor/src/steps/common/processor_status_saver.rs index fadc98258..9ab95a3a8 100644 --- a/rust/sdk-processor/src/steps/common/processor_status_saver.rs +++ b/rust/sdk-processor/src/steps/common/processor_status_saver.rs @@ -110,16 +110,14 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { } else { BackfillStatus::InProgress }; + let backfill_end_version_mapped = backfill_end_version.map(|v| v as i64); let status = BackfillProcessorStatus { backfill_alias: backfill_alias.clone(), backfill_status, last_success_version: lst_success_version, last_transaction_timestamp: end_timestamp, backfill_start_version: backfill_start_version.unwrap_or(0) as i64, - - backfill_end_version: backfill_end_version - .unwrap_or(last_success_batch.metadata.end_version) - as i64, + backfill_end_version: backfill_end_version_mapped, }; execute_with_better_error( conn_pool.clone(), diff --git a/rust/sdk-processor/src/utils/starting_version.rs b/rust/sdk-processor/src/utils/starting_version.rs index b765767ff..2f1d3ba31 100644 --- a/rust/sdk-processor/src/utils/starting_version.rs +++ b/rust/sdk-processor/src/utils/starting_version.rs @@ -24,7 +24,7 @@ pub async fn get_starting_version( ) -> Result { // Check if there's a checkpoint in the approrpiate processor status table. let latest_processed_version = - get_latest_processed_version_from_db(indexer_processor_config, conn_pool) + get_starting_version_from_db(indexer_processor_config, conn_pool) .await .context("Failed to get latest processed version from DB")?; @@ -37,7 +37,7 @@ pub async fn get_starting_version( )) } -async fn get_latest_processed_version_from_db( +async fn get_starting_version_from_db( indexer_processor_config: &IndexerProcessorConfig, conn_pool: ArcDbPool, ) -> Result> { @@ -226,7 +226,7 @@ mod tests { last_success_version: 10, last_transaction_timestamp: None, backfill_start_version: 0, - backfill_end_version: 10, + backfill_end_version: Some(10), }) .execute(&mut conn_pool.clone().get().await.unwrap()) .await @@ -263,7 +263,7 @@ mod tests { last_success_version: 10, last_transaction_timestamp: None, backfill_start_version: 0, - backfill_end_version: 10, + backfill_end_version: Some(10), }) .execute(&mut conn_pool.clone().get().await.unwrap()) .await