-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enable parquet to start processing from the specified starting version in the config #644
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
|
||
if let Some(min_processed_version) = min_processed_version { | ||
Ok(std::cmp::max( | ||
min_processed_version + 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if min_processed_version
= config_starting_version
= 0, then this will return 1 so version 0 will never be processed. Removing the + 1
will fix this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch! will fix tjos
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub async fn get_starting_version( | |
indexer_processor_config: &IndexerProcessorConfig, | |
conn_pool: ArcDbPool, | |
) -> Result<u64> { | |
// Check if there's a checkpoint in the appropriate processor status table. | |
let latest_processed_version = | |
get_starting_version_from_db(indexer_processor_config, conn_pool) | |
.await | |
.context("Failed to get latest processed version from DB")?; | |
// If nothing checkpointed, return the `starting_version` from the config, or 0 if not set. | |
Ok(latest_processed_version.unwrap_or( | |
indexer_processor_config | |
.transaction_stream_config | |
.starting_version | |
.unwrap_or(0), | |
)) | |
} |
hmm, in that case, would this also return 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// return the higher of the checkpointed version + 1 and `starting_version`.
Ok(status.map(|status| {
std::cmp::max(
status.last_success_version as u64 + 1,
indexer_processor_config
.transaction_stream_config
.starting_version
.unwrap_or(0),
)
}))
```
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah lol do you mind changing that too please
3dfd2f6
to
7ebce69
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #644 +/- ##
=======================================
- Coverage 47.7% 45.2% -2.6%
=======================================
Files 228 229 +1
Lines 27205 27016 -189
=======================================
- Hits 12983 12216 -767
- Misses 14222 14800 +578 ☔ View full report in Codecov by Sentry. |
Merge activity
|
Description
parquet processor was always starting from the min processed version or 0 if not all tables are processed. And we would like the consistency across all processors where it should start from the higher version.
Test Plan