Skip to content

Commit

Permalink
enable parquet to start processing from the specified starting versio…
Browse files Browse the repository at this point in the history
…n in the config (#644)

### Description 
parquet processor was always starting from the min processed version or 0 if not all tables are processed. And we would like the consistency across all processors where it should start from the higher version.

### Test Plan
![Screenshot 2024-12-11 at 5 27 31 PM](https://github.com/user-attachments/assets/5ffbbc38-d54c-40b6-ada6-6bb6c7616472)
![Screenshot 2024-12-11 at 5 27 25 PM](https://github.com/user-attachments/assets/4c37296c-a89c-4e87-b9df-09c055314179)

![Screenshot 2024-12-11 at 5 26 56 PM](https://github.com/user-attachments/assets/62beaccb-d681-4cbc-9708-3f0ed7751f40)
  • Loading branch information
yuunlimm authored Dec 16, 2024
1 parent 97eccf2 commit c094090
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions rust/sdk-processor/src/utils/starting_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,19 @@ pub async fn get_min_last_success_version_parquet(
.context("Failed to get minimum last success version from DB")?
};

// If nothing checkpointed, return the `starting_version` from the config, or 0 if not set.
Ok(min_processed_version.unwrap_or(
indexer_processor_config
.transaction_stream_config
.starting_version
.unwrap_or(0),
))
let config_starting_version = indexer_processor_config
.transaction_stream_config
.starting_version
.unwrap_or(0);

if let Some(min_processed_version) = min_processed_version {
Ok(std::cmp::max(
min_processed_version,
config_starting_version,
))
} else {
Ok(config_starting_version)
}
}

/// Get the minimum last success version from the database for the given processors.
Expand Down Expand Up @@ -208,7 +214,7 @@ async fn get_starting_version_from_db(
// return the higher of the checkpointed version + 1 and `starting_version`.
Ok(status.map(|status| {
std::cmp::max(
status.last_success_version as u64 + 1,
status.last_success_version as u64,
indexer_processor_config
.transaction_stream_config
.starting_version
Expand Down

0 comments on commit c094090

Please sign in to comment.