From c094090d65a777ea6d757fc81bb7790131859bbc Mon Sep 17 00:00:00 2001 From: Yuun Lim <38443641+yuunlimm@users.noreply.github.com> Date: Mon, 16 Dec 2024 14:52:08 -0800 Subject: [PATCH] enable parquet to start processing from the specified starting version in the config (#644) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Description parquet processor was always starting from the min processed version or 0 if not all tables are processed. And we would like the consistency across all processors where it should start from the higher version. ### Test Plan ![Screenshot 2024-12-11 at 5 27 31 PM](https://github.com/user-attachments/assets/5ffbbc38-d54c-40b6-ada6-6bb6c7616472) ![Screenshot 2024-12-11 at 5 27 25 PM](https://github.com/user-attachments/assets/4c37296c-a89c-4e87-b9df-09c055314179) ![Screenshot 2024-12-11 at 5 26 56 PM](https://github.com/user-attachments/assets/62beaccb-d681-4cbc-9708-3f0ed7751f40) --- .../src/utils/starting_version.rs | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/rust/sdk-processor/src/utils/starting_version.rs b/rust/sdk-processor/src/utils/starting_version.rs index ec1f6f3a..9c1b58a5 100644 --- a/rust/sdk-processor/src/utils/starting_version.rs +++ b/rust/sdk-processor/src/utils/starting_version.rs @@ -61,13 +61,19 @@ pub async fn get_min_last_success_version_parquet( .context("Failed to get minimum last success version from DB")? }; - // If nothing checkpointed, return the `starting_version` from the config, or 0 if not set. - Ok(min_processed_version.unwrap_or( - indexer_processor_config - .transaction_stream_config - .starting_version - .unwrap_or(0), - )) + let config_starting_version = indexer_processor_config + .transaction_stream_config + .starting_version + .unwrap_or(0); + + if let Some(min_processed_version) = min_processed_version { + Ok(std::cmp::max( + min_processed_version, + config_starting_version, + )) + } else { + Ok(config_starting_version) + } } /// Get the minimum last success version from the database for the given processors. @@ -208,7 +214,7 @@ async fn get_starting_version_from_db( // return the higher of the checkpointed version + 1 and `starting_version`. Ok(status.map(|status| { std::cmp::max( - status.last_success_version as u64 + 1, + status.last_success_version as u64, indexer_processor_config .transaction_stream_config .starting_version