Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable parquet to start processing from the specified starting version in the config #644

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions rust/sdk-processor/src/utils/starting_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,19 @@
.context("Failed to get minimum last success version from DB")?
};

// If nothing checkpointed, return the `starting_version` from the config, or 0 if not set.
Ok(min_processed_version.unwrap_or(
indexer_processor_config
.transaction_stream_config
.starting_version
.unwrap_or(0),
))
let config_starting_version = indexer_processor_config
.transaction_stream_config
.starting_version
.unwrap_or(0);

Check warning on line 67 in rust/sdk-processor/src/utils/starting_version.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/utils/starting_version.rs#L64-L67

Added lines #L64 - L67 were not covered by tests

if let Some(min_processed_version) = min_processed_version {
Ok(std::cmp::max(
min_processed_version,
config_starting_version,
))

Check warning on line 73 in rust/sdk-processor/src/utils/starting_version.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/utils/starting_version.rs#L69-L73

Added lines #L69 - L73 were not covered by tests
} else {
Ok(config_starting_version)

Check warning on line 75 in rust/sdk-processor/src/utils/starting_version.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/utils/starting_version.rs#L75

Added line #L75 was not covered by tests
}
}

/// Get the minimum last success version from the database for the given processors.
Expand Down Expand Up @@ -208,7 +214,7 @@
// return the higher of the checkpointed version + 1 and `starting_version`.
Ok(status.map(|status| {
std::cmp::max(
status.last_success_version as u64 + 1,
status.last_success_version as u64,
indexer_processor_config
.transaction_stream_config
.starting_version
Expand Down
Loading