diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 51f288ca2..7842ea261 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -4181,6 +4181,7 @@ name = "sdk-processor" version = "0.1.0" dependencies = [ "ahash", + "allocative", "anyhow", "aptos-indexer-processor-sdk", "aptos-indexer-processor-sdk-server-framework", @@ -4196,12 +4197,15 @@ dependencies = [ "field_count", "futures", "futures-util", + "google-cloud-storage", "hex", "jemallocator", "kanal", "lazy_static", "native-tls", "num_cpus", + "parquet", + "parquet_derive", "postgres-native-tls", "processor", "rayon", diff --git a/rust/sdk-processor/Cargo.toml b/rust/sdk-processor/Cargo.toml index 6859a36df..06dbfea92 100644 --- a/rust/sdk-processor/Cargo.toml +++ b/rust/sdk-processor/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] ahash = { workspace = true } +allocative = { workspace = true } anyhow = { workspace = true } aptos-indexer-processor-sdk = { workspace = true } aptos-indexer-processor-sdk-server-framework = { workspace = true } @@ -21,11 +22,20 @@ diesel_migrations = { workspace = true } field_count = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } + +google-cloud-storage = { workspace = true } hex = { workspace = true } jemallocator = { workspace = true } kanal = { workspace = true } lazy_static = { workspace = true } + +# Postgres SSL support +native-tls = { workspace = true } num_cpus = { workspace = true } +# Parquet support +parquet = { workspace = true } +parquet_derive = { workspace = true } +postgres-native-tls = { workspace = true } processor = { workspace = true } rayon = { workspace = true } serde = { workspace = true } @@ -34,14 +44,10 @@ sha2 = { workspace = true } strum = { workspace = true } tiny-keccak = { workspace = true } tokio = { workspace = true } +tokio-postgres = { workspace = true } tracing = { workspace = true } url = { workspace = true } -# Postgres SSL support -native-tls = { workspace = true } -postgres-native-tls = { workspace = true } -tokio-postgres = { workspace = true } - [features] libpq = ["diesel/postgres"] # When using the default features we enable the diesel/postgres feature. We configure diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index b7ec20f35..50bc4254b 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -144,10 +144,8 @@ pub struct ParquetDefaultProcessorConfig { pub bucket_name: String, #[serde(default)] pub bucket_root: String, - #[serde( - default = "ParquetDefaultProcessorConfig::default_parquet_handler_response_channel_size" - )] - pub parquet_handler_response_channel_size: usize, + #[serde(default = "ParquetDefaultProcessorConfig::default_channel_size")] + pub channel_size: usize, #[serde(default = "ParquetDefaultProcessorConfig::default_max_buffer_size")] pub max_buffer_size: usize, #[serde(default = "ParquetDefaultProcessorConfig::default_parquet_upload_interval")] @@ -160,7 +158,7 @@ pub struct ParquetDefaultProcessorConfig { impl ParquetDefaultProcessorConfig { /// Make the default very large on purpose so that by default it's not chunked /// This prevents any unexpected changes in behavior - pub const fn default_parquet_handler_response_channel_size() -> usize { + pub const fn default_channel_size() -> usize { 100_000 } diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index 96094b44c..6293aeb69 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -3,6 +3,12 @@ use crate::{ db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, processor_config::ProcessorConfig, }, + parquet_processors::ParquetTypeEnum, + steps::parquet_default_processor::{ + gcs_handler::{create_new_writer, GCSUploader}, + parquet_default_extractor::ParquetDefaultExtractor, + size_buffer::SizeBufferStep, + }, utils::{ chain_id::check_or_update_chain_id, database::{new_db_pool, run_migrations, ArcDbPool}, @@ -10,8 +16,25 @@ use crate::{ }, }; use aptos_indexer_processor_sdk::{ - aptos_indexer_transaction_stream::TransactionStream, traits::processor_trait::ProcessorTrait, + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::TransactionStreamStep, + traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, +}; +use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; +use parquet::schema::types::Type; +use processor::{ + bq_analytics::generic_parquet_processor::HasParquetSchema, + db::common::models::default_models::{ + parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, + parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, + parquet_write_set_changes::WriteSetChangeModel, + }, }; +use std::{collections::HashMap, sync::Arc, time::Duration}; +use tracing::{debug, info}; + +const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; pub struct ParquetDefaultProcessor { pub config: IndexerProcessorConfig, @@ -64,9 +87,10 @@ impl ProcessorTrait for ParquetDefaultProcessor { // Determine the processing mode (backfill or regular) let is_backfill = self.config.backfill_config.is_some(); + // TODO: Revisit when parquet version tracker is available. // Query the starting version - let _starting_version = if is_backfill { - get_starting_version(&self.config, self.db_pool.clone()).await?; + let starting_version = if is_backfill { + get_starting_version(&self.config, self.db_pool.clone()).await? } else { // Regular mode logic: Fetch the minimum last successful version across all relevant tables let table_names = self @@ -79,8 +103,9 @@ impl ProcessorTrait for ParquetDefaultProcessor { self.config.processor_config.name() ) })?; + get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names) - .await?; + .await? }; // Check and update the ledger chain id to ensure we're indexing the correct chain @@ -90,7 +115,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { .await?; check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; - let _parquet_processor_config = match self.config.processor_config.clone() { + let parquet_processor_config = match self.config.processor_config.clone() { ProcessorConfig::ParquetDefaultProcessor(parquet_processor_config) => { parquet_processor_config }, @@ -102,7 +127,92 @@ impl ProcessorTrait for ParquetDefaultProcessor { }, }; - // Define processor steps - Ok(()) + // Define processor transaction stream config + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + + let parquet_default_extractor = ParquetDefaultExtractor { + opt_in_tables: None, // : parquet_processor_config.tables.iter().map(|s| s.to_string()).collect(), + }; + + let credentials = parquet_processor_config + .google_application_credentials + .clone(); + + if let Some(credentials) = credentials { + std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); + } + + let gcs_config = GcsClientConfig::default() + .with_auth() + .await + .expect("Failed to create GCS client config"); + + let gcs_client = Arc::new(GCSClient::new(gcs_config)); + + let parquet_type_to_schemas: HashMap> = [ + (ParquetTypeEnum::MoveResource, MoveResource::schema()), + ( + ParquetTypeEnum::WriteSetChange, + WriteSetChangeModel::schema(), + ), + (ParquetTypeEnum::Transaction, ParquetTransaction::schema()), + (ParquetTypeEnum::TableItem, TableItem::schema()), + (ParquetTypeEnum::MoveModule, MoveModule::schema()), + ] + .into_iter() + .collect(); + + let parquet_type_to_writer = parquet_type_to_schemas + .iter() + .map(|(key, schema)| { + let writer = create_new_writer(schema.clone()).expect("Failed to create writer"); + (*key, writer) + }) + .collect(); + + let buffer_uploader = GCSUploader::new( + gcs_client.clone(), + parquet_type_to_schemas, + parquet_type_to_writer, + parquet_processor_config.bucket_name.clone(), + parquet_processor_config.bucket_root.clone(), + self.name().to_string(), + )?; + + let channel_size = parquet_processor_config.channel_size; + + let default_size_buffer_step = SizeBufferStep::new( + Duration::from_secs(parquet_processor_config.parquet_upload_interval), + buffer_uploader, + parquet_processor_config.max_buffer_size, + ); + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to(parquet_default_extractor.into_runnable_step(), channel_size) + .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .end_and_return_output_receiver(channel_size); + + // (Optional) Parse the results + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } } } diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index 3dd4c9776..f41861537 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -5,7 +5,7 @@ pub mod default_processor; pub mod events_processor; pub mod fungible_asset_processor; pub mod objects_processor; -mod parquet_default_processor; +pub mod parquet_default_processor; pub mod stake_processor; pub mod token_v2_processor; pub mod user_transaction_processor; diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/gcs_handler.rs b/rust/sdk-processor/src/steps/parquet_default_processor/gcs_handler.rs new file mode 100644 index 000000000..6a94ce64d --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_default_processor/gcs_handler.rs @@ -0,0 +1,185 @@ +use crate::parquet_processors::{ParquetStruct, ParquetTypeEnum, ParquetTypeStructs}; +use anyhow::Context; +use aptos_indexer_processor_sdk::utils::errors::ProcessorError; +use async_trait::async_trait; +use google_cloud_storage::client::Client as GCSClient; +use parquet::{ + file::{properties::WriterProperties, writer::SerializedFileWriter}, + record::RecordWriter, + schema::types::Type, +}; +use processor::bq_analytics::{ + gcs_handler::upload_parquet_to_gcs, + generic_parquet_processor::{GetTimeStamp, HasParquetSchema, HasVersion}, +}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; +use tracing::{debug, error}; + +pub struct GCSUploader { + gcs_client: Arc, + parquet_type_to_schemas: HashMap>, + parquet_type_to_writer: HashMap>>, + pub bucket_name: String, + pub bucket_root: String, + pub processor_name: String, +} + +#[async_trait] +pub trait Uploadable { + async fn handle_buffer( + &mut self, + buffer: ParquetTypeStructs, + ) -> anyhow::Result<(), ProcessorError>; +} + +#[async_trait] +impl Uploadable for GCSUploader { + async fn handle_buffer( + &mut self, + buffer: ParquetTypeStructs, + ) -> anyhow::Result<(), ProcessorError> { + // Directly call `upload_generic` for each buffer type + let table_name = buffer.get_table_name(); + + let result = match buffer { + ParquetTypeStructs::Transaction(transactions) => { + self.upload_generic(&transactions[..], ParquetTypeEnum::Transaction, table_name) + .await + }, + ParquetTypeStructs::MoveResource(resources) => { + self.upload_generic(&resources[..], ParquetTypeEnum::MoveResource, table_name) + .await + }, + ParquetTypeStructs::WriteSetChange(changes) => { + self.upload_generic(&changes[..], ParquetTypeEnum::WriteSetChange, table_name) + .await + }, + ParquetTypeStructs::TableItem(items) => { + self.upload_generic(&items[..], ParquetTypeEnum::TableItem, table_name) + .await + }, + ParquetTypeStructs::MoveModule(modules) => { + self.upload_generic(&modules[..], ParquetTypeEnum::MoveModule, table_name) + .await + }, + }; + + if let Err(e) = result { + error!("Failed to upload buffer: {}", e); + return Err(ProcessorError::ProcessError { + message: format!("Failed to upload buffer: {}", e), + }); + } + Ok(()) + } +} + +pub fn create_new_writer(schema: Arc) -> anyhow::Result>> { + let props = WriterProperties::builder() + .set_compression(parquet::basic::Compression::LZ4) + .build(); + let props_arc = Arc::new(props); + + SerializedFileWriter::new(Vec::new(), schema, props_arc).context("Failed to create new writer") +} + +impl GCSUploader { + pub fn new( + gcs_client: Arc, + parquet_type_to_schemas: HashMap>, + parquet_type_to_writer: HashMap>>, + bucket_name: String, + bucket_root: String, + processor_name: String, + ) -> anyhow::Result { + Ok(Self { + gcs_client, + parquet_type_to_schemas, + parquet_type_to_writer, + bucket_name, + bucket_root, + processor_name, + }) + } + + fn create_new_writer( + &self, + parquet_type: ParquetTypeEnum, + ) -> anyhow::Result>> { + let schema = self + .parquet_type_to_schemas + .get(&parquet_type) + .context("Parquet type not found in schemas")? + .clone(); + + create_new_writer(schema) + } + + fn close_writer( + &mut self, + parquet_type: ParquetTypeEnum, + ) -> anyhow::Result>> { + let old_writer = self + .parquet_type_to_writer + .remove(&parquet_type) + .context("Writer for specified Parquet type not found")?; + + // Create a new writer and replace the old writer with it + let new_writer = self.create_new_writer(parquet_type)?; + self.parquet_type_to_writer.insert(parquet_type, new_writer); + + // Return the old writer so its contents can be used + Ok(old_writer) + } + + // Generic upload function to handle any data type + async fn upload_generic( + &mut self, + data: &[ParquetType], + parquet_type: ParquetTypeEnum, + table_name: &'static str, + ) -> anyhow::Result<()> + where + ParquetType: ParquetStruct + HasVersion + GetTimeStamp + HasParquetSchema, + for<'a> &'a [ParquetType]: RecordWriter, + { + if data.is_empty() { + debug!("Buffer is empty, skipping upload."); + return Ok(()); + } + + let writer = self + .parquet_type_to_writer + .get_mut(&parquet_type) + .context("Writer not found for specified parquet type")?; + + let mut row_group_writer = writer.next_row_group().context("Failed to get row group")?; + + data.write_to_row_group(&mut row_group_writer) + .context("Failed to write to row group")?; + + row_group_writer + .close() + .context("Failed to close row group")?; + + let old_writer = self + .close_writer(parquet_type) + .context("Failed to close writer")?; + let upload_buffer = old_writer + .into_inner() + .context("Failed to get inner buffer")?; + + let bucket_root = PathBuf::from(&self.bucket_root); + upload_parquet_to_gcs( + &self.gcs_client, + upload_buffer, + table_name, + &self.bucket_name, + &bucket_root, + self.processor_name.clone(), + ) + .await?; + + Ok(()) + } +} diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs index 35ba1ba68..755931530 100644 --- a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs +++ b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs @@ -1 +1,4 @@ +pub mod gcs_handler; + pub mod parquet_default_extractor; +pub mod size_buffer; diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs b/rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs new file mode 100644 index 000000000..c67126347 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs @@ -0,0 +1,277 @@ +use crate::{ + parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, + steps::parquet_default_processor::gcs_handler::Uploadable, +}; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + traits::{ + pollable_async_step::PollableAsyncRunType, NamedStep, PollableAsyncStep, Processable, + }, + types::transaction_context::{TransactionContext, TransactionMetadata}, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use std::{collections::HashMap, time::Duration}; +use tracing::debug; + +struct ParquetBuffer { + pub buffer: ParquetTypeStructs, + pub buffer_size_bytes: usize, + current_batch_metadata: TransactionMetadata, +} + +impl ParquetBuffer { + pub fn update_current_batch_metadata( + &mut self, + cur_batch_metadata: &TransactionMetadata, + uploaded: bool, + ) { + let buffer_metadata = &mut self.current_batch_metadata; + buffer_metadata.end_version = cur_batch_metadata.end_version; + buffer_metadata.total_size_in_bytes = cur_batch_metadata.total_size_in_bytes; + buffer_metadata.end_transaction_timestamp = + cur_batch_metadata.end_transaction_timestamp.clone(); + // if it wasn't uploaded -> we update only end_verion, size, and last timttesampe + if uploaded { + buffer_metadata.start_version = cur_batch_metadata.start_version; + buffer_metadata.start_transaction_timestamp = + cur_batch_metadata.start_transaction_timestamp.clone(); + } + } +} + +pub struct SizeBufferStep +where + U: Uploadable + Send + 'static + Sync, +{ + internal_buffers: HashMap, + pub internal_buffer_size_bytes: usize, + pub poll_interval: Duration, + pub buffer_uploader: U, + pub buffer_max_size: usize, +} + +impl SizeBufferStep +where + U: Uploadable + Send + 'static + Sync, +{ + pub fn new(poll_interval: Duration, buffer_uploader: U, buffer_max_size: usize) -> Self { + Self { + internal_buffers: HashMap::new(), + internal_buffer_size_bytes: 0, + poll_interval, + buffer_uploader, + buffer_max_size, + } + } + + fn calculate_batch_size(parquet_data: &ParquetTypeStructs) -> usize { + match parquet_data { + ParquetTypeStructs::MoveResource(data) => allocative::size_of_unique(data), + ParquetTypeStructs::WriteSetChange(data) => allocative::size_of_unique(data), + ParquetTypeStructs::Transaction(data) => allocative::size_of_unique(data), + ParquetTypeStructs::TableItem(data) => allocative::size_of_unique(data), + ParquetTypeStructs::MoveModule(data) => allocative::size_of_unique(data), + } + } + + fn append_to_buffer( + buffer: &mut ParquetBuffer, + parquet_data: ParquetTypeStructs, + ) -> Result<(), ProcessorError> { + buffer.buffer_size_bytes += Self::calculate_batch_size(&parquet_data); + match (&mut buffer.buffer, parquet_data) { + (ParquetTypeStructs::MoveResource(buf), ParquetTypeStructs::MoveResource(mut data)) => { + buf.append(&mut data) + }, + ( + ParquetTypeStructs::WriteSetChange(buf), + ParquetTypeStructs::WriteSetChange(mut data), + ) => buf.append(&mut data), + (ParquetTypeStructs::Transaction(buf), ParquetTypeStructs::Transaction(mut data)) => { + buf.append(&mut data) + }, + (ParquetTypeStructs::TableItem(buf), ParquetTypeStructs::TableItem(mut data)) => { + buf.append(&mut data) + }, + (ParquetTypeStructs::MoveModule(buf), ParquetTypeStructs::MoveModule(mut data)) => { + buf.append(&mut data) + }, + _ => { + return Err(ProcessorError::ProcessError { + message: "Failed to upload buffer".to_string(), + }) + }, + }; + Ok(()) + } +} + +#[async_trait] +impl Processable for SizeBufferStep +where + U: Uploadable + Send + 'static + Sync, +{ + type Input = HashMap; + type Output = HashMap; + type RunType = PollableAsyncRunType; + + async fn process( + &mut self, + item: TransactionContext, + ) -> Result>, ProcessorError> { + debug!("Starting process for {} data items", item.data.len()); + + let mut parquet_types = item.data; + let cur_batch_metadata = item.metadata; + let mut file_uploaded = false; + let mut upload_metadata_map = HashMap::new(); + + for (parquet_type, parquet_data) in parquet_types.drain() { + // Get or initialize the buffer for the specific ParquetTypeEnum + let buffer = self + .internal_buffers + .entry(parquet_type) + .or_insert_with(|| { + debug!( + "Initializing buffer for ParquetTypeEnum: {:?}", + parquet_type + ); + ParquetBuffer { + buffer: ParquetTypeStructs::default_for_type(&parquet_type), + buffer_size_bytes: 0, + current_batch_metadata: TransactionMetadata::default(), + } + }); + + let curr_batch_size_bytes = Self::calculate_batch_size(&parquet_data); + + debug!( + "Current batch size for {:?}: {} bytes, buffer size before append: {} bytes", + parquet_type, curr_batch_size_bytes, buffer.buffer_size_bytes + ); + + // If the current buffer size + new batch exceeds max size, upload the buffer + if buffer.buffer_size_bytes + curr_batch_size_bytes > self.buffer_max_size { + debug!( + "Buffer size {} + batch size {} exceeds max size {}. Uploading buffer for {:?}.", + buffer.buffer_size_bytes, + curr_batch_size_bytes, + self.buffer_max_size, + parquet_type + ); + + // Take the current buffer to upload and reset the buffer in place + let struct_buffer = std::mem::replace( + &mut buffer.buffer, + ParquetTypeStructs::default_for_type(&parquet_type), + ); + self.buffer_uploader.handle_buffer(struct_buffer).await?; + file_uploaded = true; + + // update this metadata before insert + let metadata = buffer.current_batch_metadata.clone(); + upload_metadata_map.insert(parquet_type, metadata); + buffer.buffer_size_bytes = 0; // Reset buffer size + } + + // Append new data to the buffer + Self::append_to_buffer(buffer, parquet_data)?; + buffer.update_current_batch_metadata(&cur_batch_metadata, file_uploaded); + debug!( + "Updated buffer size for {:?}: {} bytes", + parquet_type, buffer.buffer_size_bytes + ); + } + + if !upload_metadata_map.is_empty() { + return Ok(Some(TransactionContext { + data: upload_metadata_map, + metadata: cur_batch_metadata, + })); + } + Ok(None) + } + + async fn cleanup( + &mut self, + ) -> Result>>, ProcessorError> { + let mut metadata_map = HashMap::new(); + debug!("Starting cleanup: uploading all remaining buffers."); + for (parquet_type, mut buffer) in self.internal_buffers.drain() { + if buffer.buffer_size_bytes > 0 { + let struct_buffer = std::mem::replace( + &mut buffer.buffer, + ParquetTypeStructs::default_for_type(&parquet_type), + ); + + self.buffer_uploader.handle_buffer(struct_buffer).await?; + + let mut metadata = buffer.current_batch_metadata.clone(); + metadata.total_size_in_bytes = buffer.buffer_size_bytes as u64; + metadata_map.insert(parquet_type, metadata); + } + } + + debug!("Cleanup complete: all buffers uploaded."); + if !metadata_map.is_empty() { + return Ok(Some(vec![TransactionContext { + data: metadata_map, + metadata: TransactionMetadata::default(), + }])); + } + Ok(None) + } +} + +#[async_trait] +impl PollableAsyncStep for SizeBufferStep +where + U: Uploadable + Send + 'static + Sync, +{ + fn poll_interval(&self) -> Duration { + self.poll_interval + } + + async fn poll( + &mut self, + ) -> Result>>, ProcessorError> { + let mut metadata_map = HashMap::new(); + debug!("Polling to check if any buffers need uploading."); + + for (parquet_type, mut buffer) in self.internal_buffers.drain() { + if buffer.buffer_size_bytes > 0 { + let struct_buffer = std::mem::replace( + &mut buffer.buffer, + ParquetTypeStructs::default_for_type(&parquet_type), + ); + + self.buffer_uploader.handle_buffer(struct_buffer).await?; + + let metadata = buffer.current_batch_metadata.clone(); + metadata_map.insert(parquet_type, metadata); + + // Reset ParquetBuffer or maybe remove? + buffer.buffer_size_bytes = 0; + buffer.current_batch_metadata = TransactionMetadata::default(); + } + } + + if !metadata_map.is_empty() { + return Ok(Some(vec![TransactionContext { + data: metadata_map, + metadata: TransactionMetadata::default(), + }])); + } + Ok(None) + } +} + +impl NamedStep for SizeBufferStep +where + U: Uploadable + Send + 'static + Sync, +{ + fn name(&self) -> String { + "DefaultTimedSizeBuffer".to_string() + } +}