From b482e4ea657ed626c893ff4477854ac9900c1b07 Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Mon, 18 Nov 2024 15:26:15 -0800 Subject: [PATCH] replace panic with returning error --- .../src/steps/common/parquet_buffer_step.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs b/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs index 9358e9aac..315873f51 100644 --- a/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs +++ b/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs @@ -34,11 +34,19 @@ impl ParquetBuffer { /// Updates the metadata of the internal buffer with the latest information from `cur_batch_metadata`. /// This is used to track the end version and timestamp of the batch data in the buffer. - pub fn update_current_batch_metadata(&mut self, cur_batch_metadata: &TransactionMetadata) { + pub fn update_current_batch_metadata( + &mut self, + cur_batch_metadata: &TransactionMetadata, + ) -> Result<(), ProcessorError> { if let Some(buffer_metadata) = &mut self.current_batch_metadata { if buffer_metadata.end_version != cur_batch_metadata.start_version { // this shouldn't happen but if it does, we want to know - panic!("End version of the buffer metadata does not match the start version of the current batch metadata"); + return Err(ProcessorError::ProcessError { + message: format!( + "Gap founded: Buffer metadata end_version mismatch: {} != {}", + buffer_metadata.end_version, cur_batch_metadata.start_version + ), + }); } // Update metadata fields with the current batch's end information @@ -50,6 +58,7 @@ impl ParquetBuffer { // Initialize the metadata with the current batch's start information self.current_batch_metadata = Some(cur_batch_metadata.clone()); } + Ok(()) } } @@ -145,7 +154,7 @@ impl ParquetBufferStep { // Append new data to the buffer Self::append_to_buffer(buffer, parquet_data)?; - buffer.update_current_batch_metadata(cur_batch_metadata); + buffer.update_current_batch_metadata(cur_batch_metadata)?; debug!( "Updated buffer size for {:?}: {} bytes",