From 13b5024badf70a6c14da0a695ee0ff9bc4fa1723 Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Tue, 25 Jun 2024 08:34:24 -0700 Subject: [PATCH 1/2] fix gap detector to update status --- rust/processor/src/gap_detectors/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/rust/processor/src/gap_detectors/mod.rs b/rust/processor/src/gap_detectors/mod.rs index 20bba694d..0f9518af0 100644 --- a/rust/processor/src/gap_detectors/mod.rs +++ b/rust/processor/src/gap_detectors/mod.rs @@ -51,11 +51,10 @@ pub async fn create_gap_detector_status_tracker_loop( let mut default_gap_detector = DefaultGapDetector::new(starting_version); let mut parquet_gap_detector = ParquetFileGapDetector::new(starting_version); - + let last_update_time = std::time::Instant::now(); loop { match gap_detector_receiver.recv().await { Ok(ProcessingResult::DefaultProcessingResult(result)) => { - let last_update_time = std::time::Instant::now(); match default_gap_detector .process_versions(ProcessingResult::DefaultProcessingResult(result)) { @@ -112,7 +111,6 @@ pub async fn create_gap_detector_status_tracker_loop( service_type = PROCESSOR_SERVICE_TYPE, "[ParquetGapDetector] received parquet gap detector task", ); - let last_update_time = std::time::Instant::now(); match parquet_gap_detector .process_versions(ProcessingResult::ParquetProcessingResult(result)) { From ed29911cdca4d957c46886b4590ed7a40b46a6be Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Tue, 25 Jun 2024 08:55:40 -0700 Subject: [PATCH 2/2] update time --- rust/processor/src/gap_detectors/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rust/processor/src/gap_detectors/mod.rs b/rust/processor/src/gap_detectors/mod.rs index 0f9518af0..d0b35c621 100644 --- a/rust/processor/src/gap_detectors/mod.rs +++ b/rust/processor/src/gap_detectors/mod.rs @@ -51,7 +51,7 @@ pub async fn create_gap_detector_status_tracker_loop( let mut default_gap_detector = DefaultGapDetector::new(starting_version); let mut parquet_gap_detector = ParquetFileGapDetector::new(starting_version); - let last_update_time = std::time::Instant::now(); + let mut last_update_time = std::time::Instant::now(); loop { match gap_detector_receiver.recv().await { Ok(ProcessingResult::DefaultProcessingResult(result)) => { @@ -86,6 +86,7 @@ pub async fn create_gap_detector_status_tracker_loop( ) .await .unwrap(); + last_update_time = std::time::Instant::now(); } } }, @@ -145,6 +146,7 @@ pub async fn create_gap_detector_status_tracker_loop( ) .await .unwrap(); + last_update_time = std::time::Instant::now(); } else { tracing::info!("Not Updating last processed version"); }