From 51607384fd14dbb4071d03e0f9705e564ce6908e Mon Sep 17 00:00:00 2001 From: Yuun Lim <38443641+yuunlimm@users.noreply.github.com> Date: Mon, 18 Nov 2024 16:22:07 -0800 Subject: [PATCH] [SDK-parquet] parquet sized buffer and gcs handler (#602) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit add a parquet specific step that stores parquet structs in a buffer and triggers uploads using gcs_handler which can handle any of parquet types defined in the enum ![Screenshot 2024-11-13 at 5.41.29 PM.png](https://graphite-user-uploaded-assets-prod.s3.amazonaws.com/XVbPtMdqqe4K1PNnLQvf/a1d13f34-5d9c-40af-86d0-349660b67e8e.png) --- rust/Cargo.lock | 5 + rust/Cargo.toml | 1 + rust/sdk-processor/Cargo.toml | 18 +- .../src/config/processor_config.rs | 30 +- .../src/parquet_processors/mod.rs | 45 +- .../parquet_default_processor.rs | 125 ++++- .../src/steps/common/gcs_uploader.rs | 189 ++++++++ rust/sdk-processor/src/steps/common/mod.rs | 2 + .../src/steps/common/parquet_buffer_step.rs | 434 ++++++++++++++++++ 9 files changed, 821 insertions(+), 28 deletions(-) create mode 100644 rust/sdk-processor/src/steps/common/gcs_uploader.rs create mode 100644 rust/sdk-processor/src/steps/common/parquet_buffer_step.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 8f20e1fe7..d4134b6d1 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -4202,6 +4202,7 @@ name = "sdk-processor" version = "0.1.0" dependencies = [ "ahash", + "allocative", "anyhow", "aptos-indexer-processor-sdk", "aptos-indexer-processor-sdk-server-framework", @@ -4217,12 +4218,16 @@ dependencies = [ "field_count", "futures", "futures-util", + "google-cloud-storage", "hex", "jemallocator", "kanal", "lazy_static", + "mockall", "native-tls", "num_cpus", + "parquet", + "parquet_derive", "postgres-native-tls", "processor", "rayon", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 0534679f3..a81e3962d 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -145,3 +145,4 @@ canonical_json = "0.5.0" serde_canonical_json = "1.0.0" allocative = "0.3.3" allocative_derive = "0.3.3" +mockall = "0.12.1" diff --git a/rust/sdk-processor/Cargo.toml b/rust/sdk-processor/Cargo.toml index 6859a36df..51efcd66c 100644 --- a/rust/sdk-processor/Cargo.toml +++ b/rust/sdk-processor/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] ahash = { workspace = true } +allocative = { workspace = true } anyhow = { workspace = true } aptos-indexer-processor-sdk = { workspace = true } aptos-indexer-processor-sdk-server-framework = { workspace = true } @@ -21,11 +22,22 @@ diesel_migrations = { workspace = true } field_count = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } + +google-cloud-storage = { workspace = true } hex = { workspace = true } jemallocator = { workspace = true } kanal = { workspace = true } lazy_static = { workspace = true } + +mockall = { workspace = true } + +# Postgres SSL support +native-tls = { workspace = true } num_cpus = { workspace = true } +# Parquet support +parquet = { workspace = true } +parquet_derive = { workspace = true } +postgres-native-tls = { workspace = true } processor = { workspace = true } rayon = { workspace = true } serde = { workspace = true } @@ -34,14 +46,10 @@ sha2 = { workspace = true } strum = { workspace = true } tiny-keccak = { workspace = true } tokio = { workspace = true } +tokio-postgres = { workspace = true } tracing = { workspace = true } url = { workspace = true } -# Postgres SSL support -native-tls = { workspace = true } -postgres-native-tls = { workspace = true } -tokio-postgres = { workspace = true } - [features] libpq = ["diesel/postgres"] # When using the default features we enable the diesel/postgres feature. We configure diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index b490b73a4..57d1af2c8 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -149,10 +149,8 @@ pub struct ParquetDefaultProcessorConfig { pub bucket_name: String, #[serde(default)] pub bucket_root: String, - #[serde( - default = "ParquetDefaultProcessorConfig::default_parquet_handler_response_channel_size" - )] - pub parquet_handler_response_channel_size: usize, + #[serde(default = "ParquetDefaultProcessorConfig::default_channel_size")] + pub channel_size: usize, #[serde(default = "ParquetDefaultProcessorConfig::default_max_buffer_size")] pub max_buffer_size: usize, #[serde(default = "ParquetDefaultProcessorConfig::default_parquet_upload_interval")] @@ -165,7 +163,7 @@ pub struct ParquetDefaultProcessorConfig { impl ParquetDefaultProcessorConfig { /// Make the default very large on purpose so that by default it's not chunked /// This prevents any unexpected changes in behavior - pub const fn default_parquet_handler_response_channel_size() -> usize { + pub const fn default_channel_size() -> usize { 100_000 } @@ -191,7 +189,7 @@ mod tests { bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, - parquet_handler_response_channel_size: 10, + channel_size: 10, max_buffer_size: 100000, parquet_upload_interval: 1800, }); @@ -200,11 +198,13 @@ mod tests { assert!(result.is_ok()); let table_names = result.unwrap(); - - assert_eq!(table_names, vec![ - "parquet_default_processor.Transaction".to_string(), - "parquet_default_processor.MoveResource".to_string(), - ]); + let table_names: HashSet = table_names.into_iter().collect(); + let expected_names: HashSet = + ["Transaction".to_string(), "MoveResource".to_string()] + .iter() + .map(|e| format!("parquet_default_processor.{}", e)) + .collect(); + assert_eq!(table_names, expected_names); } #[test] @@ -214,7 +214,7 @@ mod tests { bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, - parquet_handler_response_channel_size: 10, + channel_size: 10, max_buffer_size: 100000, parquet_upload_interval: 1800, }); @@ -234,7 +234,7 @@ mod tests { bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, - parquet_handler_response_channel_size: 10, + channel_size: 10, max_buffer_size: 100000, parquet_upload_interval: 1800, }); @@ -252,7 +252,7 @@ mod tests { bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, - parquet_handler_response_channel_size: 10, + channel_size: 10, max_buffer_size: 100000, parquet_upload_interval: 1800, }); @@ -273,7 +273,7 @@ mod tests { bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, - parquet_handler_response_channel_size: 10, + channel_size: 10, max_buffer_size: 100000, parquet_upload_interval: 1800, }); diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 3c920c709..218c9d8ac 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -1,3 +1,4 @@ +use aptos_indexer_processor_sdk::utils::errors::ProcessorError; use processor::db::parquet::models::default_models::{ parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, @@ -73,11 +74,53 @@ impl ParquetTypeStructs { match self { ParquetTypeStructs::MoveResource(_) => "move_resources", ParquetTypeStructs::WriteSetChange(_) => "write_set_changes", - ParquetTypeStructs::Transaction(_) => "parquet_transactions", + ParquetTypeStructs::Transaction(_) => "transactions", ParquetTypeStructs::TableItem(_) => "table_items", ParquetTypeStructs::MoveModule(_) => "move_modules", } } + + pub fn calculate_size(&self) -> usize { + match self { + ParquetTypeStructs::MoveResource(data) => allocative::size_of_unique(data), + ParquetTypeStructs::WriteSetChange(data) => allocative::size_of_unique(data), + ParquetTypeStructs::Transaction(data) => allocative::size_of_unique(data), + ParquetTypeStructs::TableItem(data) => allocative::size_of_unique(data), + ParquetTypeStructs::MoveModule(data) => allocative::size_of_unique(data), + } + } + + /// Appends data to the current buffer within each ParquetTypeStructs variant. + pub fn append(&mut self, other: ParquetTypeStructs) -> Result<(), ProcessorError> { + match (self, other) { + (ParquetTypeStructs::MoveResource(buf), ParquetTypeStructs::MoveResource(mut data)) => { + buf.append(&mut data); + Ok(()) + }, + ( + ParquetTypeStructs::WriteSetChange(buf), + ParquetTypeStructs::WriteSetChange(mut data), + ) => { + buf.append(&mut data); + Ok(()) + }, + (ParquetTypeStructs::Transaction(buf), ParquetTypeStructs::Transaction(mut data)) => { + buf.append(&mut data); + Ok(()) + }, + (ParquetTypeStructs::TableItem(buf), ParquetTypeStructs::TableItem(mut data)) => { + buf.append(&mut data); + Ok(()) + }, + (ParquetTypeStructs::MoveModule(buf), ParquetTypeStructs::MoveModule(mut data)) => { + buf.append(&mut data); + Ok(()) + }, + _ => Err(ProcessorError::ProcessError { + message: "Mismatched buffer types in append operation".to_string(), + }), + } + } } #[cfg(test)] diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index bb35ea276..c6c0e5e62 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -3,6 +3,14 @@ use crate::{ db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, processor_config::ProcessorConfig, }, + parquet_processors::ParquetTypeEnum, + steps::{ + common::{ + gcs_uploader::{create_new_writer, GCSUploader}, + parquet_buffer_step::ParquetBufferStep, + }, + parquet_default_processor::parquet_default_extractor::ParquetDefaultExtractor, + }, utils::{ chain_id::check_or_update_chain_id, database::{new_db_pool, run_migrations, ArcDbPool}, @@ -11,8 +19,26 @@ use crate::{ }; use anyhow::Context; use aptos_indexer_processor_sdk::{ - aptos_indexer_transaction_stream::TransactionStream, traits::processor_trait::ProcessorTrait, + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::TransactionStreamStep, + traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, +}; +use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; +use parquet::schema::types::Type; +use processor::{ + bq_analytics::generic_parquet_processor::HasParquetSchema, + db::parquet::models::default_models::{ + parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, + parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, + parquet_write_set_changes::WriteSetChangeModel, + }, + worker::TableFlags, }; +use std::{collections::HashMap, sync::Arc, time::Duration}; +use tracing::{debug, info}; + +const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; pub struct ParquetDefaultProcessor { pub config: IndexerProcessorConfig, @@ -65,9 +91,10 @@ impl ProcessorTrait for ParquetDefaultProcessor { // Determine the processing mode (backfill or regular) let is_backfill = self.config.backfill_config.is_some(); + // TODO: Revisit when parquet version tracker is available. // Query the starting version - let _starting_version = if is_backfill { - get_starting_version(&self.config, self.db_pool.clone()).await?; + let starting_version = if is_backfill { + get_starting_version(&self.config, self.db_pool.clone()).await? } else { // Regular mode logic: Fetch the minimum last successful version across all relevant tables let table_names = self @@ -77,7 +104,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { .context("Failed to get table names for the processor")?; get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names) - .await?; + .await? }; // Check and update the ledger chain id to ensure we're indexing the correct chain @@ -87,7 +114,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { .await?; check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; - let _parquet_processor_config = match self.config.processor_config.clone() { + let parquet_processor_config = match self.config.processor_config.clone() { ProcessorConfig::ParquetDefaultProcessor(parquet_processor_config) => { parquet_processor_config }, @@ -99,7 +126,91 @@ impl ProcessorTrait for ParquetDefaultProcessor { }, }; - // Define processor steps - Ok(()) + // Define processor transaction stream config + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + + let parquet_default_extractor = ParquetDefaultExtractor { + opt_in_tables: TableFlags::empty(), + }; + + let credentials = parquet_processor_config + .google_application_credentials + .clone(); + + if let Some(credentials) = credentials { + std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); + } + + let gcs_config = GcsClientConfig::default() + .with_auth() + .await + .expect("Failed to create GCS client config"); + + let gcs_client = Arc::new(GCSClient::new(gcs_config)); + + let parquet_type_to_schemas: HashMap> = [ + (ParquetTypeEnum::MoveResource, MoveResource::schema()), + ( + ParquetTypeEnum::WriteSetChange, + WriteSetChangeModel::schema(), + ), + (ParquetTypeEnum::Transaction, ParquetTransaction::schema()), + (ParquetTypeEnum::TableItem, TableItem::schema()), + (ParquetTypeEnum::MoveModule, MoveModule::schema()), + ] + .into_iter() + .collect(); + + let parquet_type_to_writer = parquet_type_to_schemas + .iter() + .map(|(key, schema)| { + let writer = create_new_writer(schema.clone()).expect("Failed to create writer"); + (*key, writer) + }) + .collect(); + + let buffer_uploader = GCSUploader::new( + gcs_client.clone(), + parquet_type_to_schemas, + parquet_type_to_writer, + parquet_processor_config.bucket_name.clone(), + parquet_processor_config.bucket_root.clone(), + self.name().to_string(), + )?; + + let channel_size = parquet_processor_config.channel_size; + + let default_size_buffer_step = ParquetBufferStep::new( + Duration::from_secs(parquet_processor_config.parquet_upload_interval), + buffer_uploader, + parquet_processor_config.max_buffer_size, + ); + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to(parquet_default_extractor.into_runnable_step(), channel_size) + .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .end_and_return_output_receiver(channel_size); + + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } } } diff --git a/rust/sdk-processor/src/steps/common/gcs_uploader.rs b/rust/sdk-processor/src/steps/common/gcs_uploader.rs new file mode 100644 index 000000000..c6d8f435f --- /dev/null +++ b/rust/sdk-processor/src/steps/common/gcs_uploader.rs @@ -0,0 +1,189 @@ +use crate::parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}; +use anyhow::Context; +use aptos_indexer_processor_sdk::utils::errors::ProcessorError; +use async_trait::async_trait; +use google_cloud_storage::client::Client as GCSClient; +use parquet::{ + file::{properties::WriterProperties, writer::SerializedFileWriter}, + record::RecordWriter, + schema::types::Type, +}; +use processor::bq_analytics::{ + gcs_handler::upload_parquet_to_gcs, + generic_parquet_processor::{GetTimeStamp, HasParquetSchema, HasVersion}, +}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; +use tracing::{debug, error}; + +pub struct GCSUploader { + gcs_client: Arc, + parquet_type_to_schemas: HashMap>, + parquet_type_to_writer: HashMap>>, + pub bucket_name: String, + pub bucket_root: String, + pub processor_name: String, +} + +#[async_trait] +pub trait Uploadable { + async fn upload_buffer( + &mut self, + buffer: ParquetTypeStructs, + ) -> anyhow::Result<(), ProcessorError>; +} + +#[async_trait] +impl Uploadable for GCSUploader { + async fn upload_buffer( + &mut self, + buffer: ParquetTypeStructs, + ) -> anyhow::Result<(), ProcessorError> { + let table_name = buffer.get_table_name(); + + let result = match buffer { + ParquetTypeStructs::Transaction(transactions) => { + self.upload_generic(&transactions[..], ParquetTypeEnum::Transaction, table_name) + .await + }, + ParquetTypeStructs::MoveResource(resources) => { + self.upload_generic(&resources[..], ParquetTypeEnum::MoveResource, table_name) + .await + }, + ParquetTypeStructs::WriteSetChange(changes) => { + self.upload_generic(&changes[..], ParquetTypeEnum::WriteSetChange, table_name) + .await + }, + ParquetTypeStructs::TableItem(items) => { + self.upload_generic(&items[..], ParquetTypeEnum::TableItem, table_name) + .await + }, + ParquetTypeStructs::MoveModule(modules) => { + self.upload_generic(&modules[..], ParquetTypeEnum::MoveModule, table_name) + .await + }, + }; + + if let Err(e) = result { + error!("Failed to upload buffer: {}", e); + return Err(ProcessorError::ProcessError { + message: format!("Failed to upload buffer: {}", e), + }); + } + Ok(()) + } +} + +pub fn create_new_writer(schema: Arc) -> anyhow::Result>> { + let props = WriterProperties::builder() + .set_compression(parquet::basic::Compression::LZ4) + .build(); + let props_arc = Arc::new(props); + + SerializedFileWriter::new(Vec::new(), schema, props_arc).context("Failed to create new writer") +} + +impl GCSUploader { + pub fn new( + gcs_client: Arc, + parquet_type_to_schemas: HashMap>, + parquet_type_to_writer: HashMap>>, + bucket_name: String, + bucket_root: String, + processor_name: String, + ) -> anyhow::Result { + Ok(Self { + gcs_client, + parquet_type_to_schemas, + parquet_type_to_writer, + bucket_name, + bucket_root, + processor_name, + }) + } + + fn create_new_writer( + &self, + parquet_type: ParquetTypeEnum, + ) -> anyhow::Result>> { + let schema = self + .parquet_type_to_schemas + .get(&parquet_type) + .context("Parquet type not found in schemas")? + .clone(); + + create_new_writer(schema) + } + + /// # Context: Why we replace our writer + /// + /// Once we’re ready to upload (either because the buffer is full or enough time has passed), + /// we don’t want to keep adding new data to that same writer. we want a clean slate for the next batch. + /// So, we replace the old writer with a new one to empty the writer buffer without losing any data. + fn get_and_replace_writer( + &mut self, + parquet_type: ParquetTypeEnum, + ) -> anyhow::Result>> { + let old_writer = self + .parquet_type_to_writer + .remove(&parquet_type) + .context("Writer for specified Parquet type not found")?; + + // Create a new writer and replace the old writer with it + let new_writer = self.create_new_writer(parquet_type)?; + self.parquet_type_to_writer.insert(parquet_type, new_writer); + + // Return the old writer so its contents can be used + Ok(old_writer) + } + + // Generic upload function to handle any data type + async fn upload_generic( + &mut self, + data: &[ParquetType], + parquet_type: ParquetTypeEnum, + table_name: &'static str, + ) -> anyhow::Result<()> + where + ParquetType: HasVersion + GetTimeStamp + HasParquetSchema, + for<'a> &'a [ParquetType]: RecordWriter, + { + if data.is_empty() { + debug!("Buffer is empty, skipping upload."); + return Ok(()); + } + + let writer = self + .parquet_type_to_writer + .get_mut(&parquet_type) + .context("Writer not found for specified parquet type")?; + + let mut row_group_writer = writer.next_row_group().context("Failed to get row group")?; + + data.write_to_row_group(&mut row_group_writer) + .context("Failed to write to row group")?; + + row_group_writer + .close() + .context("Failed to close row group")?; + + let old_writer = self + .get_and_replace_writer(parquet_type) + .context("Failed to close writer")?; + let upload_buffer = old_writer + .into_inner() + .context("Failed to get inner buffer")?; + + let bucket_root = PathBuf::from(&self.bucket_root); + upload_parquet_to_gcs( + &self.gcs_client, + upload_buffer, + table_name, + &self.bucket_name, + &bucket_root, + self.processor_name.clone(), + ) + .await?; + + Ok(()) + } +} diff --git a/rust/sdk-processor/src/steps/common/mod.rs b/rust/sdk-processor/src/steps/common/mod.rs index 6b2e05793..18c449997 100644 --- a/rust/sdk-processor/src/steps/common/mod.rs +++ b/rust/sdk-processor/src/steps/common/mod.rs @@ -1,3 +1,5 @@ +pub mod gcs_uploader; +pub mod parquet_buffer_step; pub mod processor_status_saver; pub use processor_status_saver::get_processor_status_saver; diff --git a/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs b/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs new file mode 100644 index 000000000..315873f51 --- /dev/null +++ b/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs @@ -0,0 +1,434 @@ +#[allow(unused_imports)] +use crate::{ + parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, + steps::common::gcs_uploader::{GCSUploader, Uploadable}, +}; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + traits::{ + pollable_async_step::PollableAsyncRunType, NamedStep, PollableAsyncStep, Processable, + }, + types::transaction_context::{TransactionContext, TransactionMetadata}, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use std::{collections::HashMap, time::Duration}; +use tracing::debug; + +/// `ParquetBuffer` is a struct that holds `ParquetTypeStructs` data +/// and tracks the buffer size in bytes, along with metadata about the data in the buffer. +struct ParquetBuffer { + pub buffer: ParquetTypeStructs, + pub buffer_size_bytes: usize, + current_batch_metadata: Option, +} + +impl ParquetBuffer { + fn new(parquet_type: &ParquetTypeEnum) -> Self { + Self { + buffer: ParquetTypeStructs::default_for_type(parquet_type), + buffer_size_bytes: 0, + current_batch_metadata: None, + } + } + + /// Updates the metadata of the internal buffer with the latest information from `cur_batch_metadata`. + /// This is used to track the end version and timestamp of the batch data in the buffer. + pub fn update_current_batch_metadata( + &mut self, + cur_batch_metadata: &TransactionMetadata, + ) -> Result<(), ProcessorError> { + if let Some(buffer_metadata) = &mut self.current_batch_metadata { + if buffer_metadata.end_version != cur_batch_metadata.start_version { + // this shouldn't happen but if it does, we want to know + return Err(ProcessorError::ProcessError { + message: format!( + "Gap founded: Buffer metadata end_version mismatch: {} != {}", + buffer_metadata.end_version, cur_batch_metadata.start_version + ), + }); + } + + // Update metadata fields with the current batch's end information + buffer_metadata.end_version = cur_batch_metadata.end_version; + buffer_metadata.total_size_in_bytes = cur_batch_metadata.total_size_in_bytes; + buffer_metadata.end_transaction_timestamp = + cur_batch_metadata.end_transaction_timestamp.clone(); + } else { + // Initialize the metadata with the current batch's start information + self.current_batch_metadata = Some(cur_batch_metadata.clone()); + } + Ok(()) + } +} + +/// `ParquetBufferStep` is a step that accumulates data in buffers until they reach a specified size limit. +/// +/// It then uploads the buffered data to Google Cloud Storage (GCS) through an uploader. +/// This step is typically used to manage large data volumes efficiently by buffering and uploading +/// only when necessary. +/// +/// +/// # Type Parameters +/// - `U`: A type that implements the `Uploadable` trait, providing the uploading functionality. +pub struct ParquetBufferStep { + internal_buffers: HashMap, + pub poll_interval: Duration, + pub buffer_uploader: GCSUploader, + pub buffer_max_size: usize, +} + +impl ParquetBufferStep { + pub fn new( + poll_interval: Duration, + buffer_uploader: GCSUploader, + buffer_max_size: usize, + ) -> Self { + Self { + internal_buffers: HashMap::new(), + poll_interval, + buffer_uploader, + buffer_max_size, + } + } + + fn append_to_buffer( + buffer: &mut ParquetBuffer, + parquet_data: ParquetTypeStructs, + ) -> Result<(), ProcessorError> { + buffer.buffer_size_bytes += parquet_data.calculate_size(); + buffer.buffer.append(parquet_data) + } + + /// Handles the addition of `parquet_data` to the buffer for a specified `ParquetTypeEnum`. + /// + /// We check the size of the buffer + the size of the incoming data before appending it. + /// If the sum of the two exceeds the maximum limit size, it uploads the buffer content to GCS to avoid + /// spliting the batch data, allowing for more efficient and simpler version tracking. + async fn upload_buffer_append( + &mut self, + parquet_type: ParquetTypeEnum, + parquet_data: ParquetTypeStructs, + cur_batch_metadata: &TransactionMetadata, + upload_metadata_map: &mut HashMap, + ) -> Result<(), ProcessorError> { + // Get or initialize the buffer for the specific ParquetTypeEnum + let buffer = self + .internal_buffers + .entry(parquet_type) + .or_insert_with(|| { + debug!( + "Initializing buffer for ParquetTypeEnum: {:?}", + parquet_type, + ); + ParquetBuffer::new(&parquet_type) + }); + + let curr_batch_size_bytes = parquet_data.calculate_size(); + + debug!( + "Current batch size for {:?}: {} bytes, buffer size before append: {} bytes", + parquet_type, curr_batch_size_bytes, buffer.buffer_size_bytes, + ); + + // If the current buffer size + new batch exceeds max size, upload the buffer + if buffer.buffer_size_bytes + curr_batch_size_bytes > self.buffer_max_size { + println!( + "Buffer size {} + batch size {} exceeds max size {}. Uploading buffer for {:?}.", + buffer.buffer_size_bytes, curr_batch_size_bytes, self.buffer_max_size, parquet_type + ); + + // Take the current buffer to upload and reset the buffer in place + let struct_buffer = std::mem::replace( + &mut buffer.buffer, + ParquetTypeStructs::default_for_type(&parquet_type), + ); + self.buffer_uploader.upload_buffer(struct_buffer).await?; + + // update this metadata before insert + upload_metadata_map + .insert(parquet_type, buffer.current_batch_metadata.clone().unwrap()); + buffer.buffer_size_bytes = 0; + buffer.current_batch_metadata = None; + } + + // Append new data to the buffer + Self::append_to_buffer(buffer, parquet_data)?; + buffer.update_current_batch_metadata(cur_batch_metadata)?; + + debug!( + "Updated buffer size for {:?}: {} bytes", + parquet_type, buffer.buffer_size_bytes, + ); + Ok(()) + } +} + +#[async_trait] +impl Processable for ParquetBufferStep { + type Input = HashMap; + type Output = HashMap; + type RunType = PollableAsyncRunType; + + /// Processes incoming `TransactionContext` data by appending it to the appropriate buffers. + /// + /// If any buffer exceeds the maximum size, its contents are uploaded. + /// Returns metadata information on what was uploaded. + async fn process( + &mut self, + item: TransactionContext, + ) -> Result>, ProcessorError> { + debug!("Starting process for {} data items", item.data.len()); + + let mut upload_metadata_map = HashMap::new(); + for (parquet_type, parquet_data) in item.data { + self.upload_buffer_append( + parquet_type, + parquet_data, + &item.metadata, + &mut upload_metadata_map, + ) + .await?; + } + + if !upload_metadata_map.is_empty() { + return Ok(Some(TransactionContext { + data: upload_metadata_map, + metadata: item.metadata, // cur_batch_metadata, + })); + } + Ok(None) + } + + async fn cleanup( + &mut self, + ) -> Result>>, ProcessorError> { + let mut metadata_map = HashMap::new(); + debug!("Starting cleanup: uploading all remaining buffers."); + for (parquet_type, mut buffer) in self.internal_buffers.drain() { + if buffer.buffer_size_bytes > 0 { + let struct_buffer = std::mem::replace( + &mut buffer.buffer, + ParquetTypeStructs::default_for_type(&parquet_type), + ); + + self.buffer_uploader.upload_buffer(struct_buffer).await?; + + if let Some(buffer_metadata) = &mut buffer.current_batch_metadata { + buffer_metadata.total_size_in_bytes = buffer.buffer_size_bytes as u64; + metadata_map.insert(parquet_type, buffer_metadata.clone()); + } else { + // This should never happen + panic!( + "Buffer metadata is missing for ParquetTypeEnum: {:?}", + parquet_type + ); + } + } + } + self.internal_buffers.clear(); + + debug!("Cleanup complete: all buffers uploaded."); + if !metadata_map.is_empty() { + return Ok(Some(vec![TransactionContext { + data: metadata_map, + metadata: TransactionMetadata::default(), + }])); + } + Ok(None) + } +} + +#[async_trait] +impl PollableAsyncStep for ParquetBufferStep { + fn poll_interval(&self) -> Duration { + self.poll_interval + } + + /// Polls all buffers to check if any should be uploaded based on the current size. + /// Uploads data and clears the buffer if necessary, and returns upload metadata. + async fn poll( + &mut self, + ) -> Result>>, ProcessorError> { + let mut metadata_map = HashMap::new(); + debug!("Polling to check if any buffers need uploading."); + + for (parquet_type, mut buffer) in self.internal_buffers.drain() { + if buffer.buffer_size_bytes > 0 { + let struct_buffer = std::mem::replace( + &mut buffer.buffer, + ParquetTypeStructs::default_for_type(&parquet_type), + ); + + self.buffer_uploader.upload_buffer(struct_buffer).await?; + + let metadata = buffer.current_batch_metadata.clone().unwrap(); + metadata_map.insert(parquet_type, metadata); + + buffer.buffer_size_bytes = 0; + buffer.current_batch_metadata = None; + } + } + + if !metadata_map.is_empty() { + return Ok(Some(vec![TransactionContext { + data: metadata_map, + metadata: TransactionMetadata::default(), + }])); + } + Ok(None) + } +} + +impl NamedStep for ParquetBufferStep { + fn name(&self) -> String { + "ParquetBufferStep".to_string() + } +} + +#[cfg(test)] +mod tests { + use crate::{ + config::processor_config::ParquetDefaultProcessorConfig, + steps::common::{ + gcs_uploader::{create_new_writer, GCSUploader}, + parquet_buffer_step::{ParquetBufferStep, ParquetTypeEnum, ParquetTypeStructs}, + }, + }; + use aptos_indexer_processor_sdk::{ + traits::Processable, + types::transaction_context::{TransactionContext, TransactionMetadata}, + }; + use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; + use parquet::schema::types::Type; + use processor::{ + bq_analytics::generic_parquet_processor::HasParquetSchema, + db::parquet::models::default_models::parquet_move_resources::MoveResource, + }; + use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, + }; + + #[tokio::test] + #[allow(clippy::needless_return)] + async fn test_parquet_buffer_step_no_upload() -> anyhow::Result<()> { + let poll_interval = Duration::from_secs(10); + let buffer_max_size = 100; + let parquet_processor_config = create_parquet_processor_config(); + + let buffer_uploader = create_parquet_uploader(&parquet_processor_config).await?; + let mut parquet_step = + ParquetBufferStep::new(poll_interval, buffer_uploader, buffer_max_size); + + let data = HashMap::from([( + ParquetTypeEnum::MoveResource, + ParquetTypeStructs::default_for_type(&ParquetTypeEnum::MoveResource), + )]); + let metadata = TransactionMetadata::default(); + + let result = parquet_step + .process(TransactionContext { data, metadata }) + .await + .unwrap(); + + assert!( + result.is_none(), + "Expected no upload for data below buffer_max_size" + ); + + Ok(()) + } + + #[tokio::test] + #[allow(clippy::needless_return)] + async fn test_parquet_buffer_step_trigger_upload() -> anyhow::Result<()> { + let poll_interval = Duration::from_secs(10); + let buffer_max_size = 25; // Default ParquetTypeStructs for MoveResource is 24 bytes + let parquet_processor_config = create_parquet_processor_config(); + + let buffer_uploader = create_parquet_uploader(&parquet_processor_config).await?; + let mut parquet_step = + ParquetBufferStep::new(poll_interval, buffer_uploader, buffer_max_size); + + // Test data below `buffer_max_size` + let data = HashMap::from([( + ParquetTypeEnum::MoveResource, + ParquetTypeStructs::default_for_type(&ParquetTypeEnum::MoveResource), + )]); + let metadata = TransactionMetadata::default(); + + let result = parquet_step + .process(TransactionContext { data, metadata }) + .await + .unwrap(); + assert!( + result.is_none(), + "Expected no upload for data below buffer_max_size" + ); + + // Test buffer + data > `buffer_max_size` + let data = HashMap::from([( + ParquetTypeEnum::MoveResource, + ParquetTypeStructs::default_for_type(&ParquetTypeEnum::MoveResource), + )]); + let metadata = TransactionMetadata::default(); + + let result = parquet_step + .process(TransactionContext { data, metadata }) + .await + .unwrap(); + + assert!( + result.is_some(), + "Expected upload when data exceeds buffer_max_size" + ); + + Ok(()) + } + + async fn create_parquet_uploader( + parquet_processor_config: &ParquetDefaultProcessorConfig, + ) -> anyhow::Result { + let gcs_config = GcsClientConfig::default() + .with_auth() + .await + .expect("Failed to create GCS client config"); + let gcs_client = Arc::new(GCSClient::new(gcs_config)); + + let parquet_type_to_schemas: HashMap> = + [(ParquetTypeEnum::MoveResource, MoveResource::schema())] + .into_iter() + .collect(); + + let parquet_type_to_writer = parquet_type_to_schemas + .iter() + .map(|(key, schema)| { + let writer = create_new_writer(schema.clone()).expect("Failed to create writer"); + (*key, writer) + }) + .collect(); + + GCSUploader::new( + gcs_client, + parquet_type_to_schemas, + parquet_type_to_writer, + parquet_processor_config.bucket_name.clone(), + parquet_processor_config.bucket_root.clone(), + "processor_name".to_string(), + ) + } + + fn create_parquet_processor_config() -> ParquetDefaultProcessorConfig { + ParquetDefaultProcessorConfig { + bucket_name: "bucket_name".to_string(), + bucket_root: "bucket_root".to_string(), + parquet_upload_interval: 180, + max_buffer_size: 100, + channel_size: 100, + google_application_credentials: None, + tables: HashSet::new(), + } + } +}