Skip to content

Commit

Permalink
enable parquet to start processing from the specified starting versio…
Browse files Browse the repository at this point in the history
…n in the config
  • Loading branch information
yuunlimm committed Dec 16, 2024
1 parent 22f92fc commit 7ebce69
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions rust/sdk-processor/src/utils/starting_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,19 @@ pub async fn get_min_last_success_version_parquet(
.context("Failed to get minimum last success version from DB")?
};

// If nothing checkpointed, return the `starting_version` from the config, or 0 if not set.
Ok(min_processed_version.unwrap_or(
indexer_processor_config
.transaction_stream_config
.starting_version
.unwrap_or(0),
))
let config_starting_version = indexer_processor_config
.transaction_stream_config
.starting_version
.unwrap_or(0);

Check warning on line 67 in rust/sdk-processor/src/utils/starting_version.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/utils/starting_version.rs#L64-L67

Added lines #L64 - L67 were not covered by tests

if let Some(min_processed_version) = min_processed_version {
Ok(std::cmp::max(
min_processed_version,
config_starting_version,
))

Check warning on line 73 in rust/sdk-processor/src/utils/starting_version.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/utils/starting_version.rs#L69-L73

Added lines #L69 - L73 were not covered by tests
} else {
Ok(config_starting_version)

Check warning on line 75 in rust/sdk-processor/src/utils/starting_version.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/utils/starting_version.rs#L75

Added line #L75 was not covered by tests
}
}

/// Get the minimum last success version from the database for the given processors.
Expand Down Expand Up @@ -208,7 +214,7 @@ async fn get_starting_version_from_db(
// return the higher of the checkpointed version + 1 and `starting_version`.
Ok(status.map(|status| {
std::cmp::max(
status.last_success_version as u64 + 1,
status.last_success_version as u64,
indexer_processor_config
.transaction_stream_config
.starting_version
Expand Down

0 comments on commit 7ebce69

Please sign in to comment.