diff --git a/rust/processor/src/gap_detectors/mod.rs b/rust/processor/src/gap_detectors/mod.rs index 20bba694d..d0b35c621 100644 --- a/rust/processor/src/gap_detectors/mod.rs +++ b/rust/processor/src/gap_detectors/mod.rs @@ -51,11 +51,10 @@ pub async fn create_gap_detector_status_tracker_loop( let mut default_gap_detector = DefaultGapDetector::new(starting_version); let mut parquet_gap_detector = ParquetFileGapDetector::new(starting_version); - + let mut last_update_time = std::time::Instant::now(); loop { match gap_detector_receiver.recv().await { Ok(ProcessingResult::DefaultProcessingResult(result)) => { - let last_update_time = std::time::Instant::now(); match default_gap_detector .process_versions(ProcessingResult::DefaultProcessingResult(result)) { @@ -87,6 +86,7 @@ pub async fn create_gap_detector_status_tracker_loop( ) .await .unwrap(); + last_update_time = std::time::Instant::now(); } } }, @@ -112,7 +112,6 @@ pub async fn create_gap_detector_status_tracker_loop( service_type = PROCESSOR_SERVICE_TYPE, "[ParquetGapDetector] received parquet gap detector task", ); - let last_update_time = std::time::Instant::now(); match parquet_gap_detector .process_versions(ProcessingResult::ParquetProcessingResult(result)) { @@ -147,6 +146,7 @@ pub async fn create_gap_detector_status_tracker_loop( ) .await .unwrap(); + last_update_time = std::time::Instant::now(); } else { tracing::info!("Not Updating last processed version"); }