Skip to content

Commit

Permalink
fix gap detector to update status (#427)
Browse files Browse the repository at this point in the history
* fix gap detector to update status

* update time
  • Loading branch information
yuunlimm authored Jun 25, 2024
1 parent 2f81232 commit e905538
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions rust/processor/src/gap_detectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ pub async fn create_gap_detector_status_tracker_loop(

let mut default_gap_detector = DefaultGapDetector::new(starting_version);
let mut parquet_gap_detector = ParquetFileGapDetector::new(starting_version);

let mut last_update_time = std::time::Instant::now();
loop {
match gap_detector_receiver.recv().await {
Ok(ProcessingResult::DefaultProcessingResult(result)) => {
let last_update_time = std::time::Instant::now();
match default_gap_detector
.process_versions(ProcessingResult::DefaultProcessingResult(result))
{
Expand Down Expand Up @@ -87,6 +86,7 @@ pub async fn create_gap_detector_status_tracker_loop(
)
.await
.unwrap();
last_update_time = std::time::Instant::now();
}
}
},
Expand All @@ -112,7 +112,6 @@ pub async fn create_gap_detector_status_tracker_loop(
service_type = PROCESSOR_SERVICE_TYPE,
"[ParquetGapDetector] received parquet gap detector task",
);
let last_update_time = std::time::Instant::now();
match parquet_gap_detector
.process_versions(ProcessingResult::ParquetProcessingResult(result))
{
Expand Down Expand Up @@ -147,6 +146,7 @@ pub async fn create_gap_detector_status_tracker_loop(
)
.await
.unwrap();
last_update_time = std::time::Instant::now();
} else {
tracing::info!("Not Updating last processed version");
}
Expand Down

0 comments on commit e905538

Please sign in to comment.