Skip to content

Commit

Permalink
replace panic with returning error
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Nov 18, 2024
1 parent d952fd0 commit b482e4e
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions rust/sdk-processor/src/steps/common/parquet_buffer_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,19 @@ impl ParquetBuffer {

/// Updates the metadata of the internal buffer with the latest information from `cur_batch_metadata`.
/// This is used to track the end version and timestamp of the batch data in the buffer.
pub fn update_current_batch_metadata(&mut self, cur_batch_metadata: &TransactionMetadata) {
pub fn update_current_batch_metadata(
&mut self,
cur_batch_metadata: &TransactionMetadata,
) -> Result<(), ProcessorError> {
if let Some(buffer_metadata) = &mut self.current_batch_metadata {
if buffer_metadata.end_version != cur_batch_metadata.start_version {
// this shouldn't happen but if it does, we want to know
panic!("End version of the buffer metadata does not match the start version of the current batch metadata");
return Err(ProcessorError::ProcessError {
message: format!(
"Gap founded: Buffer metadata end_version mismatch: {} != {}",
buffer_metadata.end_version, cur_batch_metadata.start_version
),
});
}

// Update metadata fields with the current batch's end information
Expand All @@ -50,6 +58,7 @@ impl ParquetBuffer {
// Initialize the metadata with the current batch's start information
self.current_batch_metadata = Some(cur_batch_metadata.clone());
}
Ok(())
}
}

Expand Down Expand Up @@ -145,7 +154,7 @@ impl ParquetBufferStep {

// Append new data to the buffer
Self::append_to_buffer(buffer, parquet_data)?;
buffer.update_current_batch_metadata(cur_batch_metadata);
buffer.update_current_batch_metadata(cur_batch_metadata)?;

debug!(
"Updated buffer size for {:?}: {} bytes",
Expand Down

0 comments on commit b482e4e

Please sign in to comment.