diff --git a/rust/sdk-processor/src/steps/common/gcs_uploader.rs b/rust/sdk-processor/src/steps/common/gcs_uploader.rs index dbe9fae41..c6d8f435f 100644 --- a/rust/sdk-processor/src/steps/common/gcs_uploader.rs +++ b/rust/sdk-processor/src/steps/common/gcs_uploader.rs @@ -26,7 +26,7 @@ pub struct GCSUploader { #[async_trait] pub trait Uploadable { - async fn handle_buffer( + async fn upload_buffer( &mut self, buffer: ParquetTypeStructs, ) -> anyhow::Result<(), ProcessorError>; @@ -34,11 +34,10 @@ pub trait Uploadable { #[async_trait] impl Uploadable for GCSUploader { - async fn handle_buffer( + async fn upload_buffer( &mut self, buffer: ParquetTypeStructs, ) -> anyhow::Result<(), ProcessorError> { - // Directly call `upload_generic` for each buffer type let table_name = buffer.get_table_name(); let result = match buffer { @@ -115,6 +114,11 @@ impl GCSUploader { create_new_writer(schema) } + /// # Context: Why we replace our writer + /// + /// Once we’re ready to upload (either because the buffer is full or enough time has passed), + /// we don’t want to keep adding new data to that same writer. we want a clean slate for the next batch. + /// So, we replace the old writer with a new one to empty the writer buffer without losing any data. fn get_and_replace_writer( &mut self, parquet_type: ParquetTypeEnum, diff --git a/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs b/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs index b81d7186c..9358e9aac 100644 --- a/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs +++ b/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs @@ -1,6 +1,7 @@ +#[allow(unused_imports)] use crate::{ parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, - steps::common::gcs_uploader::Uploadable, + steps::common::gcs_uploader::{GCSUploader, Uploadable}, }; use anyhow::Result; use aptos_indexer_processor_sdk::{ @@ -11,7 +12,6 @@ use aptos_indexer_processor_sdk::{ utils::errors::ProcessorError, }; use async_trait::async_trait; -use mockall::mock; use std::{collections::HashMap, time::Duration}; use tracing::debug; @@ -36,6 +36,11 @@ impl ParquetBuffer { /// This is used to track the end version and timestamp of the batch data in the buffer. pub fn update_current_batch_metadata(&mut self, cur_batch_metadata: &TransactionMetadata) { if let Some(buffer_metadata) = &mut self.current_batch_metadata { + if buffer_metadata.end_version != cur_batch_metadata.start_version { + // this shouldn't happen but if it does, we want to know + panic!("End version of the buffer metadata does not match the start version of the current batch metadata"); + } + // Update metadata fields with the current batch's end information buffer_metadata.end_version = cur_batch_metadata.end_version; buffer_metadata.total_size_in_bytes = cur_batch_metadata.total_size_in_bytes; @@ -57,21 +62,19 @@ impl ParquetBuffer { /// /// # Type Parameters /// - `U`: A type that implements the `Uploadable` trait, providing the uploading functionality. -pub struct ParquetBufferStep -where - U: Uploadable + Send + 'static + Sync, -{ +pub struct ParquetBufferStep { internal_buffers: HashMap, pub poll_interval: Duration, - pub buffer_uploader: U, + pub buffer_uploader: GCSUploader, pub buffer_max_size: usize, } -impl ParquetBufferStep -where - U: Uploadable + Send + 'static + Sync, -{ - pub fn new(poll_interval: Duration, buffer_uploader: U, buffer_max_size: usize) -> Self { +impl ParquetBufferStep { + pub fn new( + poll_interval: Duration, + buffer_uploader: GCSUploader, + buffer_max_size: usize, + ) -> Self { Self { internal_buffers: HashMap::new(), poll_interval, @@ -93,15 +96,13 @@ where /// We check the size of the buffer + the size of the incoming data before appending it. /// If the sum of the two exceeds the maximum limit size, it uploads the buffer content to GCS to avoid /// spliting the batch data, allowing for more efficient and simpler version tracking. - async fn handle_buffer_append( + async fn upload_buffer_append( &mut self, parquet_type: ParquetTypeEnum, parquet_data: ParquetTypeStructs, cur_batch_metadata: &TransactionMetadata, upload_metadata_map: &mut HashMap, ) -> Result<(), ProcessorError> { - let mut file_uploaded = false; - // Get or initialize the buffer for the specific ParquetTypeEnum let buffer = self .internal_buffers @@ -133,28 +134,19 @@ where &mut buffer.buffer, ParquetTypeStructs::default_for_type(&parquet_type), ); - self.buffer_uploader.handle_buffer(struct_buffer).await?; - file_uploaded = true; + self.buffer_uploader.upload_buffer(struct_buffer).await?; // update this metadata before insert upload_metadata_map .insert(parquet_type, buffer.current_batch_metadata.clone().unwrap()); buffer.buffer_size_bytes = 0; + buffer.current_batch_metadata = None; } // Append new data to the buffer Self::append_to_buffer(buffer, parquet_data)?; buffer.update_current_batch_metadata(cur_batch_metadata); - // if it wasn't uploaded -> we update only end_version, size, and last timestamp - if file_uploaded { - if let Some(buffer_metadata) = &mut buffer.current_batch_metadata { - buffer_metadata.start_version = cur_batch_metadata.start_version; - buffer_metadata.start_transaction_timestamp = - cur_batch_metadata.start_transaction_timestamp.clone(); - } - } - debug!( "Updated buffer size for {:?}: {} bytes", parquet_type, buffer.buffer_size_bytes, @@ -164,10 +156,7 @@ where } #[async_trait] -impl Processable for ParquetBufferStep -where - U: Uploadable + Send + 'static + Sync, -{ +impl Processable for ParquetBufferStep { type Input = HashMap; type Output = HashMap; type RunType = PollableAsyncRunType; @@ -184,7 +173,7 @@ where let mut upload_metadata_map = HashMap::new(); for (parquet_type, parquet_data) in item.data { - self.handle_buffer_append( + self.upload_buffer_append( parquet_type, parquet_data, &item.metadata, @@ -214,11 +203,17 @@ where ParquetTypeStructs::default_for_type(&parquet_type), ); - self.buffer_uploader.handle_buffer(struct_buffer).await?; + self.buffer_uploader.upload_buffer(struct_buffer).await?; if let Some(buffer_metadata) = &mut buffer.current_batch_metadata { buffer_metadata.total_size_in_bytes = buffer.buffer_size_bytes as u64; metadata_map.insert(parquet_type, buffer_metadata.clone()); + } else { + // This should never happen + panic!( + "Buffer metadata is missing for ParquetTypeEnum: {:?}", + parquet_type + ); } } } @@ -236,10 +231,7 @@ where } #[async_trait] -impl PollableAsyncStep for ParquetBufferStep -where - U: Uploadable + Send + 'static + Sync, -{ +impl PollableAsyncStep for ParquetBufferStep { fn poll_interval(&self) -> Duration { self.poll_interval } @@ -259,7 +251,7 @@ where ParquetTypeStructs::default_for_type(&parquet_type), ); - self.buffer_uploader.handle_buffer(struct_buffer).await?; + self.buffer_uploader.upload_buffer(struct_buffer).await?; let metadata = buffer.current_batch_metadata.clone().unwrap(); metadata_map.insert(parquet_type, metadata); @@ -279,47 +271,48 @@ where } } -impl NamedStep for ParquetBufferStep -where - U: Uploadable + Send + 'static + Sync, -{ +impl NamedStep for ParquetBufferStep { fn name(&self) -> String { "ParquetBufferStep".to_string() } } -mock! { - pub Uploadable {} - #[async_trait] - impl Uploadable for Uploadable { - async fn handle_buffer(&mut self, buffer: ParquetTypeStructs) -> Result<(), ProcessorError>; - } -} - #[cfg(test)] mod tests { - use crate::steps::common::parquet_buffer_step::{ - MockUploadable, ParquetBufferStep, ParquetTypeStructs, + use crate::{ + config::processor_config::ParquetDefaultProcessorConfig, + steps::common::{ + gcs_uploader::{create_new_writer, GCSUploader}, + parquet_buffer_step::{ParquetBufferStep, ParquetTypeEnum, ParquetTypeStructs}, + }, }; use aptos_indexer_processor_sdk::{ traits::Processable, types::transaction_context::{TransactionContext, TransactionMetadata}, }; - use std::{collections::HashMap, time::Duration}; + use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; + use parquet::schema::types::Type; + use processor::{ + bq_analytics::generic_parquet_processor::HasParquetSchema, + db::parquet::models::default_models::parquet_move_resources::MoveResource, + }; + use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, + }; #[tokio::test] #[allow(clippy::needless_return)] - async fn test_parquet_buffer_step_no_upload() { + async fn test_parquet_buffer_step_no_upload() -> anyhow::Result<()> { let poll_interval = Duration::from_secs(10); let buffer_max_size = 100; - use crate::steps::common::parquet_buffer_step::ParquetTypeEnum; - - let mock_uploader = MockUploadable::new(); + let parquet_processor_config = create_parquet_processor_config(); + let buffer_uploader = create_parquet_uploader(&parquet_processor_config).await?; let mut parquet_step = - ParquetBufferStep::new(poll_interval, mock_uploader, buffer_max_size); + ParquetBufferStep::new(poll_interval, buffer_uploader, buffer_max_size); - // Test the `process` method with data below `buffer_max_size` let data = HashMap::from([( ParquetTypeEnum::MoveResource, ParquetTypeStructs::default_for_type(&ParquetTypeEnum::MoveResource), @@ -330,30 +323,27 @@ mod tests { .process(TransactionContext { data, metadata }) .await .unwrap(); + assert!( result.is_none(), "Expected no upload for data below buffer_max_size" ); + + Ok(()) } #[tokio::test] #[allow(clippy::needless_return)] - async fn test_parquet_buffer_step() { + async fn test_parquet_buffer_step_trigger_upload() -> anyhow::Result<()> { let poll_interval = Duration::from_secs(10); - let buffer_max_size = 25; // default parquetTYpeStruct for move resource is 24 bytes - use crate::steps::common::parquet_buffer_step::ParquetTypeEnum; - - let mut mock_uploader = MockUploadable::new(); - - // Set up expectations for the mock uploader - mock_uploader - .expect_handle_buffer() - .times(1) - .returning(|_| Ok(())); + let buffer_max_size = 25; // Default ParquetTypeStructs for MoveResource is 24 bytes + let parquet_processor_config = create_parquet_processor_config(); + let buffer_uploader = create_parquet_uploader(&parquet_processor_config).await?; let mut parquet_step = - ParquetBufferStep::new(poll_interval, mock_uploader, buffer_max_size); - // Test the `process` method with data below `buffer_max_size` + ParquetBufferStep::new(poll_interval, buffer_uploader, buffer_max_size); + + // Test data below `buffer_max_size` let data = HashMap::from([( ParquetTypeEnum::MoveResource, ParquetTypeStructs::default_for_type(&ParquetTypeEnum::MoveResource), @@ -369,7 +359,7 @@ mod tests { "Expected no upload for data below buffer_max_size" ); - // Test the `process` method with data below `buffer_max_size` + // Test buffer + data > `buffer_max_size` let data = HashMap::from([( ParquetTypeEnum::MoveResource, ParquetTypeStructs::default_for_type(&ParquetTypeEnum::MoveResource), @@ -380,9 +370,56 @@ mod tests { .process(TransactionContext { data, metadata }) .await .unwrap(); + assert!( result.is_some(), "Expected upload when data exceeds buffer_max_size" ); + + Ok(()) + } + + async fn create_parquet_uploader( + parquet_processor_config: &ParquetDefaultProcessorConfig, + ) -> anyhow::Result { + let gcs_config = GcsClientConfig::default() + .with_auth() + .await + .expect("Failed to create GCS client config"); + let gcs_client = Arc::new(GCSClient::new(gcs_config)); + + let parquet_type_to_schemas: HashMap> = + [(ParquetTypeEnum::MoveResource, MoveResource::schema())] + .into_iter() + .collect(); + + let parquet_type_to_writer = parquet_type_to_schemas + .iter() + .map(|(key, schema)| { + let writer = create_new_writer(schema.clone()).expect("Failed to create writer"); + (*key, writer) + }) + .collect(); + + GCSUploader::new( + gcs_client, + parquet_type_to_schemas, + parquet_type_to_writer, + parquet_processor_config.bucket_name.clone(), + parquet_processor_config.bucket_root.clone(), + "processor_name".to_string(), + ) + } + + fn create_parquet_processor_config() -> ParquetDefaultProcessorConfig { + ParquetDefaultProcessorConfig { + bucket_name: "bucket_name".to_string(), + bucket_root: "bucket_root".to_string(), + parquet_upload_interval: 180, + max_buffer_size: 100, + channel_size: 100, + google_application_credentials: None, + tables: HashSet::new(), + } } }