Skip to content

Commit

Permalink
[data ingestion] for lagging secondary only update local progress sta…
Browse files Browse the repository at this point in the history
…te (#18927)
  • Loading branch information
phoenix-o authored Aug 8, 2024
1 parent ba9495f commit 3e6540f
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 7 deletions.
7 changes: 2 additions & 5 deletions crates/sui-data-ingestion-core/src/progress_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,12 @@ impl<P: ProgressStore> ProgressStore for ProgressStoreWrapper<P> {
task_name: String,
checkpoint_number: CheckpointSequenceNumber,
) -> Result<()> {
let last_saved = self.load(task_name.clone()).await?;
if checkpoint_number > last_saved {
if checkpoint_number > self.load(task_name.clone()).await? {
self.progress_store
.save(task_name.clone(), checkpoint_number)
.await?;
self.pending_state.insert(task_name, checkpoint_number);
} else {
self.pending_state.insert(task_name, last_saved);
}
self.pending_state.insert(task_name, checkpoint_number);
Ok(())
}
}
Expand Down
2 changes: 0 additions & 2 deletions crates/sui-data-ingestion-core/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use notify::RecursiveMode;
use notify::Watcher;
use object_store::path::Path;
use object_store::ObjectStore;
use std::cmp::max;
use std::collections::BTreeMap;
use std::ffi::OsString;
use std::fs;
Expand Down Expand Up @@ -293,7 +292,6 @@ impl CheckpointReader {
info!("cleaning processed files, watermark is {}", watermark);
self.data_limiter.gc(watermark);
self.last_pruned_watermark = watermark;
self.current_checkpoint_number = max(self.current_checkpoint_number, watermark);
for entry in fs::read_dir(self.path.clone())? {
let entry = entry?;
let filename = entry.file_name();
Expand Down

0 comments on commit 3e6540f

Please sign in to comment.