From 42ef2d9e618efa0cce5c439cde253b362998bcee Mon Sep 17 00:00:00 2001 From: Yuun Lim <38443641+yuunlimm@users.noreply.github.com> Date: Fri, 23 Aug 2024 14:02:22 -0700 Subject: [PATCH] update the parquet gap detector result to use better name and fix offset issue to start with correction version (#489) --- rust/processor/src/gap_detectors/mod.rs | 6 +++--- rust/processor/src/gap_detectors/parquet_gap_detector.rs | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rust/processor/src/gap_detectors/mod.rs b/rust/processor/src/gap_detectors/mod.rs index 70bb9ca4b..6e1a387a7 100644 --- a/rust/processor/src/gap_detectors/mod.rs +++ b/rust/processor/src/gap_detectors/mod.rs @@ -138,7 +138,7 @@ pub async fn create_gap_detector_status_tracker_loop( if res.num_gaps >= gap_detection_batch_size { tracing::warn!( processor_name, - gap_start_version = res.next_version_to_process, + gap_start_version = res.last_success_version, num_gaps = res.num_gaps, "[Parser] Processed batches with a gap", ); @@ -149,13 +149,13 @@ pub async fn create_gap_detector_status_tracker_loop( >= UPDATE_PROCESSOR_STATUS_SECS { tracing::info!( - last_processed_version = res.next_version_to_process, + last_processed_version = res.last_success_version, processor_name, "Updating last processed version" ); processor .update_last_processed_version( - res.next_version_to_process, + res.last_success_version, res.last_transaction_timestamp, ) .await diff --git a/rust/processor/src/gap_detectors/parquet_gap_detector.rs b/rust/processor/src/gap_detectors/parquet_gap_detector.rs index 08232f867..ef65ff957 100644 --- a/rust/processor/src/gap_detectors/parquet_gap_detector.rs +++ b/rust/processor/src/gap_detectors/parquet_gap_detector.rs @@ -26,7 +26,7 @@ pub struct ParquetFileGapDetectorInner { } pub struct ParquetFileGapDetectorResult { - pub next_version_to_process: u64, + pub last_success_version: u64, pub num_gaps: u64, pub last_transaction_timestamp: Option, } @@ -138,7 +138,7 @@ impl GapDetectorTrait for ParquetFileGapDetectorInner { self.update_next_version_to_process(self.max_version, &result.table_name); return Ok(GapDetectorResult::ParquetFileGapDetectorResult( ParquetFileGapDetectorResult { - next_version_to_process: self.next_version_to_process as u64, + last_success_version: self.next_version_to_process as u64 - 1, num_gaps: (self.max_version - self.next_version_to_process) as u64, last_transaction_timestamp: result.last_transaction_timestamp, }, @@ -164,7 +164,7 @@ impl GapDetectorTrait for ParquetFileGapDetectorInner { Ok(GapDetectorResult::ParquetFileGapDetectorResult( ParquetFileGapDetectorResult { - next_version_to_process: self.next_version_to_process as u64, + last_success_version: self.next_version_to_process as u64 - 1, num_gaps: (self.max_version - self.next_version_to_process) as u64, last_transaction_timestamp: result.last_transaction_timestamp, },