From 40b32d4dfbbcfaa74982ff78fd37e98ce6707ffd Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Fri, 15 Nov 2024 10:30:49 -0800 Subject: [PATCH] add comments --- .../src/steps/common/gcs_uploader.rs | 10 ++- .../src/steps/common/parquet_buffer_step.rs | 69 ++++++++----------- 2 files changed, 37 insertions(+), 42 deletions(-) diff --git a/rust/sdk-processor/src/steps/common/gcs_uploader.rs b/rust/sdk-processor/src/steps/common/gcs_uploader.rs index dbe9fae41..c6d8f435f 100644 --- a/rust/sdk-processor/src/steps/common/gcs_uploader.rs +++ b/rust/sdk-processor/src/steps/common/gcs_uploader.rs @@ -26,7 +26,7 @@ pub struct GCSUploader { #[async_trait] pub trait Uploadable { - async fn handle_buffer( + async fn upload_buffer( &mut self, buffer: ParquetTypeStructs, ) -> anyhow::Result<(), ProcessorError>; @@ -34,11 +34,10 @@ pub trait Uploadable { #[async_trait] impl Uploadable for GCSUploader { - async fn handle_buffer( + async fn upload_buffer( &mut self, buffer: ParquetTypeStructs, ) -> anyhow::Result<(), ProcessorError> { - // Directly call `upload_generic` for each buffer type let table_name = buffer.get_table_name(); let result = match buffer { @@ -115,6 +114,11 @@ impl GCSUploader { create_new_writer(schema) } + /// # Context: Why we replace our writer + /// + /// Once we’re ready to upload (either because the buffer is full or enough time has passed), + /// we don’t want to keep adding new data to that same writer. we want a clean slate for the next batch. + /// So, we replace the old writer with a new one to empty the writer buffer without losing any data. fn get_and_replace_writer( &mut self, parquet_type: ParquetTypeEnum, diff --git a/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs b/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs index b81d7186c..285498958 100644 --- a/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs +++ b/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs @@ -14,6 +14,8 @@ use async_trait::async_trait; use mockall::mock; use std::{collections::HashMap, time::Duration}; use tracing::debug; +use crate::steps::common::gcs_uploader::GCSUploader; + /// `ParquetBuffer` is a struct that holds `ParquetTypeStructs` data /// and tracks the buffer size in bytes, along with metadata about the data in the buffer. @@ -36,6 +38,11 @@ impl ParquetBuffer { /// This is used to track the end version and timestamp of the batch data in the buffer. pub fn update_current_batch_metadata(&mut self, cur_batch_metadata: &TransactionMetadata) { if let Some(buffer_metadata) = &mut self.current_batch_metadata { + if buffer_metadata.end_version != cur_batch_metadata.start_version { + // this shouldn't happen but if it does, we want to know + panic!("End version of the buffer metadata does not match the start version of the current batch metadata"); + } + // Update metadata fields with the current batch's end information buffer_metadata.end_version = cur_batch_metadata.end_version; buffer_metadata.total_size_in_bytes = cur_batch_metadata.total_size_in_bytes; @@ -57,21 +64,17 @@ impl ParquetBuffer { /// /// # Type Parameters /// - `U`: A type that implements the `Uploadable` trait, providing the uploading functionality. -pub struct ParquetBufferStep -where - U: Uploadable + Send + 'static + Sync, +pub struct ParquetBufferStep { internal_buffers: HashMap, pub poll_interval: Duration, - pub buffer_uploader: U, + pub buffer_uploader: GCSUploader, pub buffer_max_size: usize, } -impl ParquetBufferStep -where - U: Uploadable + Send + 'static + Sync, +impl ParquetBufferStep { - pub fn new(poll_interval: Duration, buffer_uploader: U, buffer_max_size: usize) -> Self { + pub fn new(poll_interval: Duration, buffer_uploader: GCSUploader, buffer_max_size: usize) -> Self { Self { internal_buffers: HashMap::new(), poll_interval, @@ -93,7 +96,7 @@ where /// We check the size of the buffer + the size of the incoming data before appending it. /// If the sum of the two exceeds the maximum limit size, it uploads the buffer content to GCS to avoid /// spliting the batch data, allowing for more efficient and simpler version tracking. - async fn handle_buffer_append( + async fn upload_buffer_append( &mut self, parquet_type: ParquetTypeEnum, parquet_data: ParquetTypeStructs, @@ -133,28 +136,20 @@ where &mut buffer.buffer, ParquetTypeStructs::default_for_type(&parquet_type), ); - self.buffer_uploader.handle_buffer(struct_buffer).await?; + self.buffer_uploader.upload_buffer(struct_buffer).await?; file_uploaded = true; // update this metadata before insert upload_metadata_map .insert(parquet_type, buffer.current_batch_metadata.clone().unwrap()); buffer.buffer_size_bytes = 0; + buffer.current_batch_metadata = None; } // Append new data to the buffer Self::append_to_buffer(buffer, parquet_data)?; buffer.update_current_batch_metadata(cur_batch_metadata); - // if it wasn't uploaded -> we update only end_version, size, and last timestamp - if file_uploaded { - if let Some(buffer_metadata) = &mut buffer.current_batch_metadata { - buffer_metadata.start_version = cur_batch_metadata.start_version; - buffer_metadata.start_transaction_timestamp = - cur_batch_metadata.start_transaction_timestamp.clone(); - } - } - debug!( "Updated buffer size for {:?}: {} bytes", parquet_type, buffer.buffer_size_bytes, @@ -164,9 +159,7 @@ where } #[async_trait] -impl Processable for ParquetBufferStep -where - U: Uploadable + Send + 'static + Sync, +impl Processable for ParquetBufferStep { type Input = HashMap; type Output = HashMap; @@ -184,7 +177,7 @@ where let mut upload_metadata_map = HashMap::new(); for (parquet_type, parquet_data) in item.data { - self.handle_buffer_append( + self.upload_buffer_append( parquet_type, parquet_data, &item.metadata, @@ -214,11 +207,14 @@ where ParquetTypeStructs::default_for_type(&parquet_type), ); - self.buffer_uploader.handle_buffer(struct_buffer).await?; + self.buffer_uploader.upload_buffer(struct_buffer).await?; if let Some(buffer_metadata) = &mut buffer.current_batch_metadata { buffer_metadata.total_size_in_bytes = buffer.buffer_size_bytes as u64; metadata_map.insert(parquet_type, buffer_metadata.clone()); + } else { + // This should never happen + panic!("Buffer metadata is missing for ParquetTypeEnum: {:?}", parquet_type); } } } @@ -236,9 +232,7 @@ where } #[async_trait] -impl PollableAsyncStep for ParquetBufferStep -where - U: Uploadable + Send + 'static + Sync, +impl PollableAsyncStep for ParquetBufferStep { fn poll_interval(&self) -> Duration { self.poll_interval @@ -259,7 +253,7 @@ where ParquetTypeStructs::default_for_type(&parquet_type), ); - self.buffer_uploader.handle_buffer(struct_buffer).await?; + self.buffer_uploader.upload_buffer(struct_buffer).await?; let metadata = buffer.current_batch_metadata.clone().unwrap(); metadata_map.insert(parquet_type, metadata); @@ -279,9 +273,7 @@ where } } -impl NamedStep for ParquetBufferStep -where - U: Uploadable + Send + 'static + Sync, +impl NamedStep for ParquetBufferStep { fn name(&self) -> String { "ParquetBufferStep".to_string() @@ -292,7 +284,7 @@ mock! { pub Uploadable {} #[async_trait] impl Uploadable for Uploadable { - async fn handle_buffer(&mut self, buffer: ParquetTypeStructs) -> Result<(), ProcessorError>; + async fn upload_buffer(&mut self, buffer: ParquetTypeStructs) -> Result<(), ProcessorError>; } } @@ -319,7 +311,6 @@ mod tests { let mut parquet_step = ParquetBufferStep::new(poll_interval, mock_uploader, buffer_max_size); - // Test the `process` method with data below `buffer_max_size` let data = HashMap::from([( ParquetTypeEnum::MoveResource, ParquetTypeStructs::default_for_type(&ParquetTypeEnum::MoveResource), @@ -338,22 +329,22 @@ mod tests { #[tokio::test] #[allow(clippy::needless_return)] - async fn test_parquet_buffer_step() { + async fn test_parquet_buffer_step_trigger_upload() { let poll_interval = Duration::from_secs(10); - let buffer_max_size = 25; // default parquetTYpeStruct for move resource is 24 bytes + let buffer_max_size = 25; // default ParquetTypeStructs for move resource is 24 bytes use crate::steps::common::parquet_buffer_step::ParquetTypeEnum; let mut mock_uploader = MockUploadable::new(); - // Set up expectations for the mock uploader mock_uploader - .expect_handle_buffer() + .expect_upload_buffer() .times(1) .returning(|_| Ok(())); let mut parquet_step = ParquetBufferStep::new(poll_interval, mock_uploader, buffer_max_size); - // Test the `process` method with data below `buffer_max_size` + + // Test the `process` method with data below `buffer_max_size` yet let data = HashMap::from([( ParquetTypeEnum::MoveResource, ParquetTypeStructs::default_for_type(&ParquetTypeEnum::MoveResource), @@ -369,7 +360,7 @@ mod tests { "Expected no upload for data below buffer_max_size" ); - // Test the `process` method with data below `buffer_max_size` + // Test the `process` method with buffer + data > `buffer_max_size` let data = HashMap::from([( ParquetTypeEnum::MoveResource, ParquetTypeStructs::default_for_type(&ParquetTypeEnum::MoveResource),