diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 8f20e1fe7..e45a1a3cb 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -4202,6 +4202,7 @@ name = "sdk-processor" version = "0.1.0" dependencies = [ "ahash", + "allocative", "anyhow", "aptos-indexer-processor-sdk", "aptos-indexer-processor-sdk-server-framework", @@ -4217,12 +4218,15 @@ dependencies = [ "field_count", "futures", "futures-util", + "google-cloud-storage", "hex", "jemallocator", "kanal", "lazy_static", "native-tls", "num_cpus", + "parquet", + "parquet_derive", "postgres-native-tls", "processor", "rayon", diff --git a/rust/sdk-processor/Cargo.toml b/rust/sdk-processor/Cargo.toml index 6859a36df..06dbfea92 100644 --- a/rust/sdk-processor/Cargo.toml +++ b/rust/sdk-processor/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] ahash = { workspace = true } +allocative = { workspace = true } anyhow = { workspace = true } aptos-indexer-processor-sdk = { workspace = true } aptos-indexer-processor-sdk-server-framework = { workspace = true } @@ -21,11 +22,20 @@ diesel_migrations = { workspace = true } field_count = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } + +google-cloud-storage = { workspace = true } hex = { workspace = true } jemallocator = { workspace = true } kanal = { workspace = true } lazy_static = { workspace = true } + +# Postgres SSL support +native-tls = { workspace = true } num_cpus = { workspace = true } +# Parquet support +parquet = { workspace = true } +parquet_derive = { workspace = true } +postgres-native-tls = { workspace = true } processor = { workspace = true } rayon = { workspace = true } serde = { workspace = true } @@ -34,14 +44,10 @@ sha2 = { workspace = true } strum = { workspace = true } tiny-keccak = { workspace = true } tokio = { workspace = true } +tokio-postgres = { workspace = true } tracing = { workspace = true } url = { workspace = true } -# Postgres SSL support -native-tls = { workspace = true } -postgres-native-tls = { workspace = true } -tokio-postgres = { workspace = true } - [features] libpq = ["diesel/postgres"] # When using the default features we enable the diesel/postgres feature. We configure diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index b490b73a4..11fa4f239 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -149,10 +149,8 @@ pub struct ParquetDefaultProcessorConfig { pub bucket_name: String, #[serde(default)] pub bucket_root: String, - #[serde( - default = "ParquetDefaultProcessorConfig::default_parquet_handler_response_channel_size" - )] - pub parquet_handler_response_channel_size: usize, + #[serde(default = "ParquetDefaultProcessorConfig::default_channel_size")] + pub channel_size: usize, #[serde(default = "ParquetDefaultProcessorConfig::default_max_buffer_size")] pub max_buffer_size: usize, #[serde(default = "ParquetDefaultProcessorConfig::default_parquet_upload_interval")] @@ -165,7 +163,7 @@ pub struct ParquetDefaultProcessorConfig { impl ParquetDefaultProcessorConfig { /// Make the default very large on purpose so that by default it's not chunked /// This prevents any unexpected changes in behavior - pub const fn default_parquet_handler_response_channel_size() -> usize { + pub const fn default_channel_size() -> usize { 100_000 } @@ -191,7 +189,7 @@ mod tests { bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, - parquet_handler_response_channel_size: 10, + channel_size: 10, max_buffer_size: 100000, parquet_upload_interval: 1800, }); @@ -214,7 +212,7 @@ mod tests { bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, - parquet_handler_response_channel_size: 10, + channel_size: 10, max_buffer_size: 100000, parquet_upload_interval: 1800, }); @@ -234,7 +232,7 @@ mod tests { bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, - parquet_handler_response_channel_size: 10, + channel_size: 10, max_buffer_size: 100000, parquet_upload_interval: 1800, }); @@ -252,7 +250,7 @@ mod tests { bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, - parquet_handler_response_channel_size: 10, + channel_size: 10, max_buffer_size: 100000, parquet_upload_interval: 1800, }); @@ -273,7 +271,7 @@ mod tests { bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, - parquet_handler_response_channel_size: 10, + channel_size: 10, max_buffer_size: 100000, parquet_upload_interval: 1800, }); diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 20372b609..8e73aa81f 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -1,3 +1,4 @@ +use aptos_indexer_processor_sdk::utils::errors::ProcessorError; use processor::db::common::models::default_models::{ parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, @@ -73,11 +74,53 @@ impl ParquetTypeStructs { match self { ParquetTypeStructs::MoveResource(_) => "move_resources", ParquetTypeStructs::WriteSetChange(_) => "write_set_changes", - ParquetTypeStructs::Transaction(_) => "parquet_transactions", + ParquetTypeStructs::Transaction(_) => "transactions", ParquetTypeStructs::TableItem(_) => "table_items", ParquetTypeStructs::MoveModule(_) => "move_modules", } } + + pub fn calculate_size(&self) -> usize { + match self { + ParquetTypeStructs::MoveResource(data) => allocative::size_of_unique(data), + ParquetTypeStructs::WriteSetChange(data) => allocative::size_of_unique(data), + ParquetTypeStructs::Transaction(data) => allocative::size_of_unique(data), + ParquetTypeStructs::TableItem(data) => allocative::size_of_unique(data), + ParquetTypeStructs::MoveModule(data) => allocative::size_of_unique(data), + } + } + + /// Appends data to the current buffer within each ParquetTypeStructs variant. + pub fn append(&mut self, other: ParquetTypeStructs) -> Result<(), ProcessorError> { + match (self, other) { + (ParquetTypeStructs::MoveResource(buf), ParquetTypeStructs::MoveResource(mut data)) => { + buf.append(&mut data); + Ok(()) + }, + ( + ParquetTypeStructs::WriteSetChange(buf), + ParquetTypeStructs::WriteSetChange(mut data), + ) => { + buf.append(&mut data); + Ok(()) + }, + (ParquetTypeStructs::Transaction(buf), ParquetTypeStructs::Transaction(mut data)) => { + buf.append(&mut data); + Ok(()) + }, + (ParquetTypeStructs::TableItem(buf), ParquetTypeStructs::TableItem(mut data)) => { + buf.append(&mut data); + Ok(()) + }, + (ParquetTypeStructs::MoveModule(buf), ParquetTypeStructs::MoveModule(mut data)) => { + buf.append(&mut data); + Ok(()) + }, + _ => Err(ProcessorError::ProcessError { + message: "Mismatched buffer types in append operation".to_string(), + }), + } + } } #[cfg(test)] diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index bb35ea276..6fe07c16d 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -3,6 +3,12 @@ use crate::{ db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, processor_config::ProcessorConfig, }, + parquet_processors::ParquetTypeEnum, + steps::parquet_default_processor::{ + gcs_handler::{create_new_writer, GCSUploader}, + parquet_default_extractor::ParquetDefaultExtractor, + size_buffer::SizeBufferStep, + }, utils::{ chain_id::check_or_update_chain_id, database::{new_db_pool, run_migrations, ArcDbPool}, @@ -11,8 +17,26 @@ use crate::{ }; use anyhow::Context; use aptos_indexer_processor_sdk::{ - aptos_indexer_transaction_stream::TransactionStream, traits::processor_trait::ProcessorTrait, + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::TransactionStreamStep, + traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, +}; +use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; +use parquet::schema::types::Type; +use processor::{ + bq_analytics::generic_parquet_processor::HasParquetSchema, + db::common::models::default_models::{ + parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, + parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, + parquet_write_set_changes::WriteSetChangeModel, + }, + worker::TableFlags, }; +use std::{collections::HashMap, sync::Arc, time::Duration}; +use tracing::{debug, info}; + +const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; pub struct ParquetDefaultProcessor { pub config: IndexerProcessorConfig, @@ -65,9 +89,10 @@ impl ProcessorTrait for ParquetDefaultProcessor { // Determine the processing mode (backfill or regular) let is_backfill = self.config.backfill_config.is_some(); + // TODO: Revisit when parquet version tracker is available. // Query the starting version - let _starting_version = if is_backfill { - get_starting_version(&self.config, self.db_pool.clone()).await?; + let starting_version = if is_backfill { + get_starting_version(&self.config, self.db_pool.clone()).await? } else { // Regular mode logic: Fetch the minimum last successful version across all relevant tables let table_names = self @@ -77,7 +102,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { .context("Failed to get table names for the processor")?; get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names) - .await?; + .await? }; // Check and update the ledger chain id to ensure we're indexing the correct chain @@ -87,7 +112,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { .await?; check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; - let _parquet_processor_config = match self.config.processor_config.clone() { + let parquet_processor_config = match self.config.processor_config.clone() { ProcessorConfig::ParquetDefaultProcessor(parquet_processor_config) => { parquet_processor_config }, @@ -99,7 +124,92 @@ impl ProcessorTrait for ParquetDefaultProcessor { }, }; - // Define processor steps - Ok(()) + // Define processor transaction stream config + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + + let parquet_default_extractor = ParquetDefaultExtractor { + opt_in_tables: TableFlags::empty(), + }; + + let credentials = parquet_processor_config + .google_application_credentials + .clone(); + + if let Some(credentials) = credentials { + std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); + } + + let gcs_config = GcsClientConfig::default() + .with_auth() + .await + .expect("Failed to create GCS client config"); + + let gcs_client = Arc::new(GCSClient::new(gcs_config)); + + let parquet_type_to_schemas: HashMap> = [ + (ParquetTypeEnum::MoveResource, MoveResource::schema()), + ( + ParquetTypeEnum::WriteSetChange, + WriteSetChangeModel::schema(), + ), + (ParquetTypeEnum::Transaction, ParquetTransaction::schema()), + (ParquetTypeEnum::TableItem, TableItem::schema()), + (ParquetTypeEnum::MoveModule, MoveModule::schema()), + ] + .into_iter() + .collect(); + + let parquet_type_to_writer = parquet_type_to_schemas + .iter() + .map(|(key, schema)| { + let writer = create_new_writer(schema.clone()).expect("Failed to create writer"); + (*key, writer) + }) + .collect(); + + let buffer_uploader = GCSUploader::new( + gcs_client.clone(), + parquet_type_to_schemas, + parquet_type_to_writer, + parquet_processor_config.bucket_name.clone(), + parquet_processor_config.bucket_root.clone(), + self.name().to_string(), + )?; + + let channel_size = parquet_processor_config.channel_size; + + let default_size_buffer_step = SizeBufferStep::new( + Duration::from_secs(parquet_processor_config.parquet_upload_interval), + buffer_uploader, + parquet_processor_config.max_buffer_size, + ); + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to(parquet_default_extractor.into_runnable_step(), channel_size) + .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .end_and_return_output_receiver(channel_size); + + // (Optional) Parse the results + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } } } diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/gcs_handler.rs b/rust/sdk-processor/src/steps/parquet_default_processor/gcs_handler.rs new file mode 100644 index 000000000..73ae412d6 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_default_processor/gcs_handler.rs @@ -0,0 +1,185 @@ +use crate::parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}; +use anyhow::Context; +use aptos_indexer_processor_sdk::utils::errors::ProcessorError; +use async_trait::async_trait; +use google_cloud_storage::client::Client as GCSClient; +use parquet::{ + file::{properties::WriterProperties, writer::SerializedFileWriter}, + record::RecordWriter, + schema::types::Type, +}; +use processor::bq_analytics::{ + gcs_handler::upload_parquet_to_gcs, + generic_parquet_processor::{GetTimeStamp, HasParquetSchema, HasVersion}, +}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; +use tracing::{debug, error}; + +pub struct GCSUploader { + gcs_client: Arc, + parquet_type_to_schemas: HashMap>, + parquet_type_to_writer: HashMap>>, + pub bucket_name: String, + pub bucket_root: String, + pub processor_name: String, +} + +#[async_trait] +pub trait Uploadable { + async fn handle_buffer( + &mut self, + buffer: ParquetTypeStructs, + ) -> anyhow::Result<(), ProcessorError>; +} + +#[async_trait] +impl Uploadable for GCSUploader { + async fn handle_buffer( + &mut self, + buffer: ParquetTypeStructs, + ) -> anyhow::Result<(), ProcessorError> { + // Directly call `upload_generic` for each buffer type + let table_name = buffer.get_table_name(); + + let result = match buffer { + ParquetTypeStructs::Transaction(transactions) => { + self.upload_generic(&transactions[..], ParquetTypeEnum::Transaction, table_name) + .await + }, + ParquetTypeStructs::MoveResource(resources) => { + self.upload_generic(&resources[..], ParquetTypeEnum::MoveResource, table_name) + .await + }, + ParquetTypeStructs::WriteSetChange(changes) => { + self.upload_generic(&changes[..], ParquetTypeEnum::WriteSetChange, table_name) + .await + }, + ParquetTypeStructs::TableItem(items) => { + self.upload_generic(&items[..], ParquetTypeEnum::TableItem, table_name) + .await + }, + ParquetTypeStructs::MoveModule(modules) => { + self.upload_generic(&modules[..], ParquetTypeEnum::MoveModule, table_name) + .await + }, + }; + + if let Err(e) = result { + error!("Failed to upload buffer: {}", e); + return Err(ProcessorError::ProcessError { + message: format!("Failed to upload buffer: {}", e), + }); + } + Ok(()) + } +} + +pub fn create_new_writer(schema: Arc) -> anyhow::Result>> { + let props = WriterProperties::builder() + .set_compression(parquet::basic::Compression::LZ4) + .build(); + let props_arc = Arc::new(props); + + SerializedFileWriter::new(Vec::new(), schema, props_arc).context("Failed to create new writer") +} + +impl GCSUploader { + pub fn new( + gcs_client: Arc, + parquet_type_to_schemas: HashMap>, + parquet_type_to_writer: HashMap>>, + bucket_name: String, + bucket_root: String, + processor_name: String, + ) -> anyhow::Result { + Ok(Self { + gcs_client, + parquet_type_to_schemas, + parquet_type_to_writer, + bucket_name, + bucket_root, + processor_name, + }) + } + + fn create_new_writer( + &self, + parquet_type: ParquetTypeEnum, + ) -> anyhow::Result>> { + let schema = self + .parquet_type_to_schemas + .get(&parquet_type) + .context("Parquet type not found in schemas")? + .clone(); + + create_new_writer(schema) + } + + fn close_writer( + &mut self, + parquet_type: ParquetTypeEnum, + ) -> anyhow::Result>> { + let old_writer = self + .parquet_type_to_writer + .remove(&parquet_type) + .context("Writer for specified Parquet type not found")?; + + // Create a new writer and replace the old writer with it + let new_writer = self.create_new_writer(parquet_type)?; + self.parquet_type_to_writer.insert(parquet_type, new_writer); + + // Return the old writer so its contents can be used + Ok(old_writer) + } + + // Generic upload function to handle any data type + async fn upload_generic( + &mut self, + data: &[ParquetType], + parquet_type: ParquetTypeEnum, + table_name: &'static str, + ) -> anyhow::Result<()> + where + ParquetType: HasVersion + GetTimeStamp + HasParquetSchema, + for<'a> &'a [ParquetType]: RecordWriter, + { + if data.is_empty() { + debug!("Buffer is empty, skipping upload."); + return Ok(()); + } + + let writer = self + .parquet_type_to_writer + .get_mut(&parquet_type) + .context("Writer not found for specified parquet type")?; + + let mut row_group_writer = writer.next_row_group().context("Failed to get row group")?; + + data.write_to_row_group(&mut row_group_writer) + .context("Failed to write to row group")?; + + row_group_writer + .close() + .context("Failed to close row group")?; + + let old_writer = self + .close_writer(parquet_type) + .context("Failed to close writer")?; + let upload_buffer = old_writer + .into_inner() + .context("Failed to get inner buffer")?; + + let bucket_root = PathBuf::from(&self.bucket_root); + upload_parquet_to_gcs( + &self.gcs_client, + upload_buffer, + table_name, + &self.bucket_name, + &bucket_root, + self.processor_name.clone(), + ) + .await?; + + Ok(()) + } +} diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs index 35ba1ba68..755931530 100644 --- a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs +++ b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs @@ -1 +1,4 @@ +pub mod gcs_handler; + pub mod parquet_default_extractor; +pub mod size_buffer; diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs b/rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs new file mode 100644 index 000000000..7ac2ba8f9 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs @@ -0,0 +1,260 @@ +use crate::{ + parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, + steps::parquet_default_processor::gcs_handler::Uploadable, +}; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + traits::{ + pollable_async_step::PollableAsyncRunType, NamedStep, PollableAsyncStep, Processable, + }, + types::transaction_context::{TransactionContext, TransactionMetadata}, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use std::{collections::HashMap, time::Duration}; +use tracing::debug; + +struct ParquetBuffer { + pub buffer: ParquetTypeStructs, + pub buffer_size_bytes: usize, + current_batch_metadata: TransactionMetadata, +} + +impl ParquetBuffer { + fn new(parquet_type: &ParquetTypeEnum) -> Self { + Self { + buffer: ParquetTypeStructs::default_for_type(parquet_type), + buffer_size_bytes: 0, + current_batch_metadata: TransactionMetadata::default(), + } + } + + pub fn update_current_batch_metadata( + &mut self, + cur_batch_metadata: &TransactionMetadata, + uploaded: bool, + ) { + let buffer_metadata = &mut self.current_batch_metadata; + buffer_metadata.end_version = cur_batch_metadata.end_version; + buffer_metadata.total_size_in_bytes = cur_batch_metadata.total_size_in_bytes; + buffer_metadata.end_transaction_timestamp = + cur_batch_metadata.end_transaction_timestamp.clone(); + // if it wasn't uploaded -> we update only end_version, size, and last timestamp + if uploaded { + buffer_metadata.start_version = cur_batch_metadata.start_version; + buffer_metadata.start_transaction_timestamp = + cur_batch_metadata.start_transaction_timestamp.clone(); + } + } +} + +pub struct SizeBufferStep +where + U: Uploadable + Send + 'static + Sync, +{ + internal_buffers: HashMap, + pub internal_buffer_size_bytes: usize, + pub poll_interval: Duration, + pub buffer_uploader: U, + pub buffer_max_size: usize, +} + +impl SizeBufferStep +where + U: Uploadable + Send + 'static + Sync, +{ + pub fn new(poll_interval: Duration, buffer_uploader: U, buffer_max_size: usize) -> Self { + Self { + internal_buffers: HashMap::new(), + internal_buffer_size_bytes: 0, + poll_interval, + buffer_uploader, + buffer_max_size, + } + } + + fn append_to_buffer( + buffer: &mut ParquetBuffer, + parquet_data: ParquetTypeStructs, + ) -> Result<(), ProcessorError> { + buffer.buffer_size_bytes += parquet_data.calculate_size(); + buffer.buffer.append(parquet_data) + } + + async fn handle_buffer_append( + &mut self, + parquet_type: ParquetTypeEnum, + parquet_data: ParquetTypeStructs, + cur_batch_metadata: &TransactionMetadata, + upload_metadata_map: &mut HashMap, + ) -> Result<(), ProcessorError> { + let mut file_uploaded = false; + + // Get or initialize the buffer for the specific ParquetTypeEnum + let buffer = self + .internal_buffers + .entry(parquet_type) + .or_insert_with(|| { + debug!( + "Initializing buffer for ParquetTypeEnum: {:?}", + parquet_type, + ); + ParquetBuffer::new(&parquet_type) + }); + + let curr_batch_size_bytes = parquet_data.calculate_size(); + + debug!( + "Current batch size for {:?}: {} bytes, buffer size before append: {} bytes", + parquet_type, curr_batch_size_bytes, buffer.buffer_size_bytes, + ); + + // If the current buffer size + new batch exceeds max size, upload the buffer + if buffer.buffer_size_bytes + curr_batch_size_bytes > self.buffer_max_size { + debug!( + "Buffer size {} + batch size {} exceeds max size {}. Uploading buffer for {:?}.", + buffer.buffer_size_bytes, curr_batch_size_bytes, self.buffer_max_size, parquet_type + ); + + // Take the current buffer to upload and reset the buffer in place + let struct_buffer = std::mem::replace( + &mut buffer.buffer, + ParquetTypeStructs::default_for_type(&parquet_type), + ); + self.buffer_uploader.handle_buffer(struct_buffer).await?; + file_uploaded = true; + + // update this metadata before insert + upload_metadata_map.insert(parquet_type, buffer.current_batch_metadata.clone()); + buffer.buffer_size_bytes = 0; // Reset buffer size + } + + // Append new data to the buffer + Self::append_to_buffer(buffer, parquet_data)?; + buffer.update_current_batch_metadata(cur_batch_metadata, file_uploaded); + debug!( + "Updated buffer size for {:?}: {} bytes", + parquet_type, buffer.buffer_size_bytes, + ); + Ok(()) + } +} + +#[async_trait] +impl Processable for SizeBufferStep +where + U: Uploadable + Send + 'static + Sync, +{ + type Input = HashMap; + type Output = HashMap; + type RunType = PollableAsyncRunType; + + async fn process( + &mut self, + item: TransactionContext, + ) -> Result>, ProcessorError> { + debug!("Starting process for {} data items", item.data.len()); + + let mut upload_metadata_map = HashMap::new(); + for (parquet_type, parquet_data) in item.data { + self.handle_buffer_append( + parquet_type, + parquet_data, + &item.metadata, + &mut upload_metadata_map, + ) + .await?; + } + + if !upload_metadata_map.is_empty() { + return Ok(Some(TransactionContext { + data: upload_metadata_map, + metadata: item.metadata, // cur_batch_metadata, + })); + } + Ok(None) + } + + async fn cleanup( + &mut self, + ) -> Result>>, ProcessorError> { + let mut metadata_map = HashMap::new(); + debug!("Starting cleanup: uploading all remaining buffers."); + for (parquet_type, mut buffer) in self.internal_buffers.drain() { + if buffer.buffer_size_bytes > 0 { + let struct_buffer = std::mem::replace( + &mut buffer.buffer, + ParquetTypeStructs::default_for_type(&parquet_type), + ); + + self.buffer_uploader.handle_buffer(struct_buffer).await?; + + let mut metadata = buffer.current_batch_metadata.clone(); + metadata.total_size_in_bytes = buffer.buffer_size_bytes as u64; + metadata_map.insert(parquet_type, metadata); + } + } + self.internal_buffers.clear(); + + debug!("Cleanup complete: all buffers uploaded."); + if !metadata_map.is_empty() { + return Ok(Some(vec![TransactionContext { + data: metadata_map, + metadata: TransactionMetadata::default(), + }])); + } + Ok(None) + } +} + +#[async_trait] +impl PollableAsyncStep for SizeBufferStep +where + U: Uploadable + Send + 'static + Sync, +{ + fn poll_interval(&self) -> Duration { + self.poll_interval + } + + async fn poll( + &mut self, + ) -> Result>>, ProcessorError> { + let mut metadata_map = HashMap::new(); + debug!("Polling to check if any buffers need uploading."); + + for (parquet_type, mut buffer) in self.internal_buffers.drain() { + if buffer.buffer_size_bytes > 0 { + let struct_buffer = std::mem::replace( + &mut buffer.buffer, + ParquetTypeStructs::default_for_type(&parquet_type), + ); + + self.buffer_uploader.handle_buffer(struct_buffer).await?; + + let metadata = buffer.current_batch_metadata.clone(); + metadata_map.insert(parquet_type, metadata); + + // Reset ParquetBuffer or maybe remove? + buffer.buffer_size_bytes = 0; + buffer.current_batch_metadata = TransactionMetadata::default(); + } + } + + if !metadata_map.is_empty() { + return Ok(Some(vec![TransactionContext { + data: metadata_map, + metadata: TransactionMetadata::default(), + }])); + } + Ok(None) + } +} + +impl NamedStep for SizeBufferStep +where + U: Uploadable + Send + 'static + Sync, +{ + fn name(&self) -> String { + "DefaultTimedSizeBuffer".to_string() + } +}