diff --git a/rust/sdk-processor/src/utils/starting_version.rs b/rust/sdk-processor/src/utils/starting_version.rs index ec1f6f3a..9c1b58a5 100644 --- a/rust/sdk-processor/src/utils/starting_version.rs +++ b/rust/sdk-processor/src/utils/starting_version.rs @@ -61,13 +61,19 @@ pub async fn get_min_last_success_version_parquet( .context("Failed to get minimum last success version from DB")? }; - // If nothing checkpointed, return the `starting_version` from the config, or 0 if not set. - Ok(min_processed_version.unwrap_or( - indexer_processor_config - .transaction_stream_config - .starting_version - .unwrap_or(0), - )) + let config_starting_version = indexer_processor_config + .transaction_stream_config + .starting_version + .unwrap_or(0); + + if let Some(min_processed_version) = min_processed_version { + Ok(std::cmp::max( + min_processed_version, + config_starting_version, + )) + } else { + Ok(config_starting_version) + } } /// Get the minimum last success version from the database for the given processors. @@ -208,7 +214,7 @@ async fn get_starting_version_from_db( // return the higher of the checkpointed version + 1 and `starting_version`. Ok(status.map(|status| { std::cmp::max( - status.last_success_version as u64 + 1, + status.last_success_version as u64, indexer_processor_config .transaction_stream_config .starting_version