diff --git a/rust/processor/src/bq_analytics/generic_parquet_processor.rs b/rust/processor/src/bq_analytics/generic_parquet_processor.rs index 30fa3ccc1..0351ab1ab 100644 --- a/rust/processor/src/bq_analytics/generic_parquet_processor.rs +++ b/rust/processor/src/bq_analytics/generic_parquet_processor.rs @@ -178,6 +178,23 @@ where // This is to cover the case when interval duration has passed but buffer is empty if self.buffer.is_empty() { debug!("Buffer is empty, skipping upload."); + + let parquet_processing_result = ParquetProcessingResult { + start_version: -1, // this is to indicate that nothing was actually uploaded + end_version: -1, + last_transaction_timestamp: None, + txn_version_to_struct_count: None, + parquet_processed_structs: None, + table_name: ParquetType::TABLE_NAME.to_string(), + }; + + self.gap_detector_sender + .send(ProcessingResult::ParquetProcessingResult( + parquet_processing_result, + )) + .await + .expect("[Parser] Failed to send versions to gap detector"); + return Ok(()); } let start_version = self diff --git a/rust/processor/src/gap_detectors/parquet_gap_detector.rs b/rust/processor/src/gap_detectors/parquet_gap_detector.rs index 794a5f9f1..7c7e7f1bf 100644 --- a/rust/processor/src/gap_detectors/parquet_gap_detector.rs +++ b/rust/processor/src/gap_detectors/parquet_gap_detector.rs @@ -3,7 +3,7 @@ use crate::gap_detectors::{GapDetectorResult, GapDetectorTrait, ProcessingResult}; use ahash::{AHashMap, AHashSet}; -use anyhow::{Context, Result}; +use anyhow::Result; use std::{ cmp::{max, min}, sync::{Arc, Mutex}, @@ -134,9 +134,24 @@ impl GapDetectorTrait for ParquetFileGapDetectorInner { ProcessingResult::ParquetProcessingResult(r) => r, _ => panic!("Invalid result type"), }; - let parquet_processed_structs = result - .parquet_processed_structs - .context("Missing parquet processed transactions")?; + + let parquet_processed_structs = result.parquet_processed_structs.unwrap_or_else(|| { + info!("Interval duration has passed, but there are no structs to process."); + AHashMap::new() + }); + + if result.start_version == -1 { + // meaning we didn't really upload anything but we stil lwould like to update the map to reduce memory usage. + self.update_next_version_to_process(self.max_version, &result.table_name); + return Ok(GapDetectorResult::ParquetFileGapDetectorResult( + ParquetFileGapDetectorResult { + next_version_to_process: self.next_version_to_process as u64, + num_gaps: (self.max_version - self.next_version_to_process) as u64, + last_transaction_timestamp: result.last_transaction_timestamp, + }, + )); + } + info!( start_version = result.start_version, end_version = result.end_version, diff --git a/rust/processor/src/processors/parquet_processors/parquet_ans_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_ans_processor.rs index 2564c8fa7..cfa759524 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_ans_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_ans_processor.rs @@ -82,7 +82,7 @@ impl Debug for ParquetAnsProcessor { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "ParquetAnsProcessor {{ capacity of trnasactions channel: {:?}}}", + "ParquetAnsProcessor {{ capacity of ans_primary_name_v2 channel: {:?}}}", &self.ans_primary_name_v2_sender.capacity() ) } diff --git a/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs index bad362ddc..cb4f2621c 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_default_processor.rs @@ -270,9 +270,10 @@ pub fn process_transactions( for detail in wsc_details { match detail { WriteSetChangeDetail::Module(module) => { - move_modules.push(module.clone()); + let txn_version = module.txn_version; + move_modules.push(module); transaction_version_to_struct_count - .entry(module.txn_version) + .entry(txn_version) .and_modify(|e| *e += 1) .or_insert(1); },