diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index 1b1ab6f98..cc2b424dc 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -32,6 +32,14 @@ use processor::db::common::models::default_models::{ }; use std::{sync::Arc, time::Duration}; use tracing::{debug, info}; +use std::collections::HashMap; +use crate::parquet_processors::ParquetTypeEnum; +use parquet::schema::types::Type; +use crate::steps::parquet_default_processor::gcs_handler::create_new_writer; +use crate::steps::parquet_default_processor::gcs_handler::GCSUploader; +use processor::bq_analytics::generic_parquet_processor::HasParquetSchema; +use crate::steps::parquet_default_processor::size_buffer::SizeBufferStep; + const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; @@ -164,181 +172,42 @@ impl ProcessorTrait for ParquetDefaultProcessor { let gcs_client = Arc::new(GCSClient::new(gcs_config)); - let generic_parquet_buffer_handler = GenericParquetBufferHandler::new( - parquet_processor_config.bucket_name.clone(), - parquet_processor_config.bucket_root.clone(), - [ParquetTransaction::default()].as_slice().schema().unwrap(), - self.name().to_string(), - ) - .unwrap(); - - let transaction_step = TimedSizeBufferStep::< - Input, - ParquetTransaction, - GenericParquetBufferHandler, - >::new( - Duration::from_secs(parquet_processor_config.parquet_upload_interval), - TableConfig { - table_name: "transactions".to_string(), - bucket_name: parquet_processor_config.bucket_name.clone(), - bucket_root: parquet_processor_config.bucket_root.clone(), - max_size: parquet_processor_config.max_buffer_size, - }, - gcs_client.clone(), - self.name(), - generic_parquet_buffer_handler, - ); + let parquet_type_to_schemas: HashMap> = [ + (ParquetTypeEnum::MoveResource, MoveResource::schema()), + (ParquetTypeEnum::WriteSetChange, WriteSetChangeModel::schema()), + (ParquetTypeEnum::Transaction, ParquetTransaction::schema()), + (ParquetTypeEnum::TableItem, TableItem::schema()), + (ParquetTypeEnum::MoveModule, MoveModule::schema()), + ].into_iter().collect(); - let move_resource_generic_parquet_buffer_handler = GenericParquetBufferHandler::new( - parquet_processor_config.bucket_name.clone(), - parquet_processor_config.bucket_root.clone(), - [MoveResource::default()].as_slice().schema().unwrap(), - self.name().to_string(), - ) - .unwrap(); + let parquet_type_to_writer = parquet_type_to_schemas.iter().map(|(key, schema)| { + let writer = create_new_writer(schema.clone()).expect("Failed to create writer"); + (*key, writer) + }).collect(); - let move_resource_step = TimedSizeBufferStep::< - Input, - MoveResource, - GenericParquetBufferHandler, - >::new( - Duration::from_secs(parquet_processor_config.parquet_upload_interval), - TableConfig { - table_name: "move_resources".to_string(), - bucket_name: parquet_processor_config.bucket_name.clone(), - bucket_root: parquet_processor_config.bucket_root.clone(), - max_size: parquet_processor_config.max_buffer_size, - }, + let buffer_uploader = GCSUploader::new( gcs_client.clone(), - self.name(), - move_resource_generic_parquet_buffer_handler, - ); - - let wsc_generic_parquet_buffer_handler = GenericParquetBufferHandler::new( + parquet_type_to_schemas, + parquet_type_to_writer, parquet_processor_config.bucket_name.clone(), parquet_processor_config.bucket_root.clone(), - [WriteSetChangeModel::default()] - .as_slice() - .schema() - .unwrap(), self.name().to_string(), - ) - .unwrap(); - - let write_set_change_step = TimedSizeBufferStep::< - Input, - WriteSetChangeModel, - GenericParquetBufferHandler, - >::new( - Duration::from_secs(parquet_processor_config.parquet_upload_interval), - TableConfig { - table_name: "write_set_changes".to_string(), - bucket_name: parquet_processor_config.bucket_name.clone(), - bucket_root: parquet_processor_config.bucket_root.clone(), - max_size: parquet_processor_config.max_buffer_size, - }, - gcs_client.clone(), - self.name(), - wsc_generic_parquet_buffer_handler, - ); - - let table_items_generic_parquet_buffer_handler = GenericParquetBufferHandler::new( - parquet_processor_config.bucket_name.clone(), - parquet_processor_config.bucket_root.clone(), - [TableItem::default()].as_slice().schema().unwrap(), - self.name().to_string(), - ) - .unwrap(); - - let table_item_step = - TimedSizeBufferStep::>::new( - Duration::from_secs(parquet_processor_config.parquet_upload_interval), - TableConfig { - table_name: "table_items".to_string(), - bucket_name: parquet_processor_config.bucket_name.clone(), - bucket_root: parquet_processor_config.bucket_root.clone(), - max_size: parquet_processor_config.max_buffer_size, - }, - gcs_client.clone(), - self.name(), - table_items_generic_parquet_buffer_handler, - ); - - let move_module_generic_parquet_buffer_handler = GenericParquetBufferHandler::new( - parquet_processor_config.bucket_name.clone(), - parquet_processor_config.bucket_root.clone(), - [MoveModule::default()].as_slice().schema().unwrap(), - self.name().to_string(), - ) - .unwrap(); - - let move_module_step = - TimedSizeBufferStep::>::new( - Duration::from_secs(parquet_processor_config.parquet_upload_interval), - TableConfig { - table_name: "move_modules".to_string(), - bucket_name: parquet_processor_config.bucket_name.clone(), - bucket_root: parquet_processor_config.bucket_root.clone(), - max_size: parquet_processor_config.max_buffer_size, - }, - gcs_client.clone(), - self.name(), - move_module_generic_parquet_buffer_handler, - ); + )?; let channel_size = parquet_processor_config.channel_size; - let builder = ProcessorBuilder::new_with_inputless_first_step( - transaction_stream.into_runnable_step(), + let default_size_buffer_step = SizeBufferStep::new( + Duration::from_secs(parquet_processor_config.parquet_upload_interval), + buffer_uploader ); - let builder = - builder.connect_to(parquet_default_extractor.into_runnable_step(), channel_size); - let mut fanout_builder = builder.fanout_broadcast(5); - - let (first_builder, first_output_receiver) = fanout_builder - .get_processor_builder() - .unwrap() - .connect_to(transaction_step.into_runnable_step(), channel_size) - .end_and_return_output_receiver(channel_size); - - let (second_builder, second_output_receiver) = fanout_builder - .get_processor_builder() - .unwrap() - .connect_to(move_resource_step.into_runnable_step(), channel_size) - .end_and_return_output_receiver(channel_size); - - let (third_builder, third_output_receiver) = fanout_builder - .get_processor_builder() - .unwrap() - .connect_to(write_set_change_step.into_runnable_step(), channel_size) - .end_and_return_output_receiver(channel_size); - - let (fourth_builder, fourth_output_receiver) = fanout_builder - .get_processor_builder() - .unwrap() - .connect_to(table_item_step.into_runnable_step(), channel_size) - .end_and_return_output_receiver(channel_size); - - let (fifth_builder, fifth_output_receiver) = fanout_builder - .get_processor_builder() - .unwrap() - .connect_to(move_module_step.into_runnable_step(), channel_size) - .end_and_return_output_receiver(channel_size); - - let (_, buffer_receiver) = ProcessorBuilder::new_with_fanin_step_with_receivers( - vec![ - (first_output_receiver, first_builder.graph), - (second_output_receiver, second_builder.graph), - (third_output_receiver, third_builder.graph), - (fourth_output_receiver, fourth_builder.graph), - (fifth_output_receiver, fifth_builder.graph), - ], - // TODO: Replace with parquet version tracker - RunnableAsyncStep::new(PassThroughStep::new_named("FaninStep".to_string())), - channel_size, + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), ) - .end_and_return_output_receiver(channel_size); + .connect_to(parquet_default_extractor.into_runnable_step(), channel_size) + .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .end_and_return_output_receiver(channel_size); // (Optional) Parse the results loop { diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/gcs_handler.rs b/rust/sdk-processor/src/steps/parquet_default_processor/gcs_handler.rs new file mode 100644 index 000000000..bbc8e70a7 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_default_processor/gcs_handler.rs @@ -0,0 +1,164 @@ +use allocative::Allocative; +use anyhow::Context; +use anyhow::anyhow; +use aptos_indexer_processor_sdk::{ + common_steps::timed_size_buffer_step::BufferHandler, + traits::parquet_extract_trait::{GetTimeStamp, HasParquetSchema, HasVersion, NamedTable}, + types::transaction_context::TransactionMetadata, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use google_cloud_storage::client::Client as GCSClient; +use parquet::{ + file::{properties::WriterProperties, writer::SerializedFileWriter}, + record::RecordWriter, + schema::types::Type, +}; +use processor::{ + bq_analytics::gcs_handler::upload_parquet_to_gcs, utils::util::naive_datetime_to_timestamp, +}; +use std::{marker::PhantomData, path::PathBuf, sync::Arc}; +use tracing::{debug, error}; +use crate::parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}; +use std::collections::HashMap; + +pub struct GCSUploader +{ + gcs_client: Arc, + parquet_type_to_schemas: HashMap>, + parquet_type_to_writer: HashMap>>, + pub bucket_name: String, + pub bucket_root: String, + pub processor_name: String, +} + + +#[async_trait] +pub trait Uploadable { + async fn handle_buffer( + &mut self, + buffer: ParquetTypeStructs, + ) -> anyhow::Result<(), ProcessorError>; +} + + +#[async_trait] +impl Uploadable for GCSUploader { + async fn handle_buffer( + &mut self, + buffer: ParquetTypeStructs, + ) -> anyhow::Result<(), ProcessorError> { + if let Err(e) = self.upload_buffer(buffer).await { + error!("Failed to upload buffer: {}", e); + return Err(ProcessorError::ProcessError { + message: format!("Failed to upload buffer: {}", e), + }); + } + Ok(()) + } +} + +pub fn create_new_writer(schema: Arc) -> anyhow::Result>> { + let props = WriterProperties::builder() + .set_compression(parquet::basic::Compression::LZ4) + .build(); + let props_arc = Arc::new(props); + + SerializedFileWriter::new(Vec::new(), schema, props_arc).context("Failed to create new writer") +} + +impl GCSUploader { + fn create_new_writer(&self, parquet_type: ParquetTypeEnum) -> anyhow::Result>> { + let schema = self.parquet_type_to_schemas.get(&parquet_type).map_or_else(|| Err(anyhow!("Parquet type not found in schemas")), |schema| create_new_writer(schema.clone())).unwrap(); + Ok(schema) + // processor::bq_analytics::generic_parquet_processor::create_new_writer(schema); + } + + // fn close_writer(&mut self) -> anyhow::Result>> { + // + // // let new_writer = self.create_new_writer()?; + // // let old_writer = std::mem::replace(&mut self.writer, new_writer); + // // Ok(old_writer) + // } + + pub fn new( + gcs_client: Arc, + parquet_type_to_schemas: HashMap>, + parquet_type_to_writer: HashMap>>, + bucket_name: String, + bucket_root: String, + processor_name: String, + ) -> anyhow::Result { + Ok(Self { + gcs_client, + parquet_type_to_schemas, + parquet_type_to_writer, + bucket_name, + bucket_root, + processor_name, + }) + } + + async fn upload_buffer( + &mut self, + buffer: ParquetTypeStructs, + ) -> anyhow::Result<()> { + + // let buffer = match buffer { + // ParquetTypeStructs::TransactionStructs(buffer) => buffer, + // _ => return Err(anyhow!("Buffer is not a transaction struct")), + // }; + // if buffer.is_empty() { + // debug!("Buffer is empty, skipping upload."); + // return Ok(()); + // } + // let struts = buffer.get_structs(); + // + // let first = struts + // .first() + // .context("Buffer is not empty but has no first element")?; + // let first_transaction_timestamp = naive_datetime_to_timestamp(first.get_timestamp()); + // let start_version = first.version(); + // let last = buffer + // .last() + // .context("Buffer is not empty but has no last element")?; + // let end_version = last.version(); + // let last_transaction_timestamp = naive_datetime_to_timestamp(last.get_timestamp()); + // let mut row_group_writer = self + // .writer + // .next_row_group() + // .context("Failed to get row group")?; + // + // buffer + // .as_slice() + // .write_to_row_group(&mut row_group_writer) + // .context("Failed to write to row group")?; + // row_group_writer + // .close() + // .context("Failed to close row group")?; + // + // let old_writer = self.close_writer().context("Failed to close writer")?; + // let upload_buffer = old_writer + // .into_inner() + // .context("Failed to get inner buffer")?; + // + // let bucket_root = PathBuf::from(&self.bucket_root); + // + // upload_parquet_to_gcs( + // &gcs_client, + // upload_buffer, + // ParquetType::TABLE_NAME, + // &self.bucket_name, + // &bucket_root, + // self.processor_name.clone(), + // ) + // .await?; + // + // metadata.start_version = start_version as u64; + // metadata.end_version = end_version as u64; + // metadata.start_transaction_timestamp = Some(first_transaction_timestamp); + // metadata.end_transaction_timestamp = Some(last_transaction_timestamp); + + Ok(()) + } +} diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs index 84618b56a..e5c461d06 100644 --- a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs +++ b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs @@ -1,3 +1,5 @@ pub mod generic_parquet_buffer_handler; pub mod parquet_default_extractor; +pub mod size_buffer; +pub mod gcs_handler; // pub mod timed_size_buffer; diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs b/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs index 6cc985fe6..cdc50c344 100644 --- a/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs +++ b/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs @@ -1,3 +1,4 @@ +use crate::parquet_processors::ParquetTypeEnum; use ahash::AHashMap; use aptos_indexer_processor_sdk::{ aptos_protos::transaction::v1::Transaction, @@ -13,7 +14,10 @@ use processor::db::common::models::default_models::{ parquet_transactions::{Transaction as ParquetTransaction, TransactionModel}, parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, }; +use std::collections::HashMap; +use crate::parquet_processors::ParquetTypeStructs; +/// Extracts parquet data from transactions, allowing optional selection of specific tables. pub struct ParquetDefaultExtractor where Self: Processable + Send + Sized + 'static, @@ -21,62 +25,79 @@ where pub opt_in_tables: Option>, } +impl ParquetDefaultExtractor { + fn add_if_opted_in( + &self, + map: &mut HashMap, + enum_type: ParquetTypeEnum, + data: ParquetTypeStructs, + ) { + if let Some(ref opt_in_tables) = self.opt_in_tables { + let table_name = enum_type.to_string(); + if opt_in_tables.contains(&table_name) { + map.insert(enum_type, data); + } + } else { + // If there's no opt-in table, include all data + map.insert(enum_type, data); + } + } +} + +type ParquetTypeMap = HashMap; + #[async_trait] impl Processable for ParquetDefaultExtractor { type Input = Vec; - type Output = ( - Vec, - Vec, - Vec, - Vec, - Vec, - ); + type Output = ParquetTypeMap; type RunType = AsyncRunType; async fn process( &mut self, transactions: TransactionContext, - ) -> anyhow::Result< - Option< - TransactionContext<( - Vec, - Vec, - Vec, - Vec, - Vec, - )>, - >, - ProcessorError, - > { - let backfill_mode = self.opt_in_tables.is_some(); + ) -> anyhow::Result>, ProcessorError> { + let (move_resources, write_set_changes, parquet_transactions, table_items, move_modules) = + process_transactions(transactions.data); - let ( - mut move_resources, - mut write_set_changes, - mut parquet_transactions, - mut table_items, - mut move_modules, - ) = process_transactions(transactions.data); + // Print the size of each extracted data type + println!("Processed data sizes:"); + println!(" - MoveResources: {}", move_resources.len()); + println!(" - WriteSetChanges: {}", write_set_changes.len()); + println!(" - ParquetTransactions: {}", parquet_transactions.len()); + println!(" - TableItems: {}", table_items.len()); + println!(" - MoveModules: {}", move_modules.len()); - if backfill_mode { - clear_unselected_data( - &self.opt_in_tables, - &mut move_resources, - &mut write_set_changes, - &mut parquet_transactions, - &mut table_items, - &mut move_modules, - ); - } + let mut map: HashMap = HashMap::new(); + // Populate the map based on opt-in tables + self.add_if_opted_in( + &mut map, + ParquetTypeEnum::MoveResource, + ParquetTypeStructs::MoveResource(move_resources), + ); + self.add_if_opted_in( + &mut map, + ParquetTypeEnum::WriteSetChange, + ParquetTypeStructs::WriteSetChange(write_set_changes), + ); + self.add_if_opted_in( + &mut map, + ParquetTypeEnum::Transaction, + ParquetTypeStructs::Transaction(parquet_transactions), + ); + self.add_if_opted_in( + &mut map, + ParquetTypeEnum::TableItem, + ParquetTypeStructs::TableItem(table_items), + ); + self.add_if_opted_in( + &mut map, + ParquetTypeEnum::MoveModule, + ParquetTypeStructs::MoveModule(move_modules), + ); + println!("Map populated with data for the following tables: {:?}", map.keys().collect::>()); Ok(Some(TransactionContext { - data: ( - parquet_transactions, - move_resources, - write_set_changes, - table_items, - move_modules, - ), + data: map, metadata: transactions.metadata, })) } @@ -139,7 +160,7 @@ pub fn process_transactions( WriteSetChangeDetail::Resource(resource) => { move_resources.push(resource); }, - WriteSetChangeDetail::Table(item, _current_item, _) => { + WriteSetChangeDetail::Table(item, _, _) => { table_items.push(item); }, } diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs b/rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs new file mode 100644 index 000000000..422b9840d --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_default_processor/size_buffer.rs @@ -0,0 +1,190 @@ +use aptos_indexer_processor_sdk::{ + traits::{ + pollable_async_step::PollableAsyncRunType, + NamedStep, PollableAsyncStep, Processable, + }, + types::transaction_context::{TransactionContext, TransactionMetadata}, + utils::errors::ProcessorError, +}; +use anyhow::Result; +use async_trait::async_trait; +use std::{time::Duration}; +use std::collections::HashMap; +use crate::parquet_processors::ParquetTypeEnum; +use crate::parquet_processors::ParquetTypeStructs; +use crate::steps::parquet_default_processor::gcs_handler::Uploadable; + +struct ParquetBuffer { + pub buffer: ParquetTypeStructs, + pub buffer_size_bytes: usize, + pub max_size: usize, + current_batch_metadata: TransactionMetadata, + upload_interval: Duration, // not sure if needed here +} + +pub struct SizeBufferStep +where + U: Uploadable + Send + 'static + Sync, +{ + pub internal_buffers: HashMap, + pub internal_buffer_size_bytes: usize, + pub poll_interval: Duration, + pub buffer_uploader: U, +} + +#[async_trait] +impl Processable for SizeBufferStep +where + U: Uploadable + Send + 'static + Sync, +{ + type Input = HashMap; + type Output = (); + type RunType = PollableAsyncRunType; + + async fn process( + &mut self, + item: TransactionContext, + ) -> Result>, ProcessorError> { + println!("Starting process for {} data items", item.data.len()); + + let mut parquet_types = item.data; // Extracts the data from the input + let mut metadata = item.metadata; + let mut file_uploaded = false; + + for (parquet_type, mut parquet_data) in parquet_types.drain() { + println!("Processing ParquetTypeEnum: {:?}", parquet_type); + + + // Get or initialize the buffer for the specific ParquetTypeEnum + let buffer = self + .internal_buffers + .entry(parquet_type.clone()) + .or_insert_with(|| { + println!("Initializing buffer for ParquetTypeEnum: {:?}", parquet_type); + // TODO: revisit. + ParquetBuffer { + buffer: ParquetTypeStructs::default_for_type(&parquet_type), + buffer_size_bytes: 0, + max_size: self.internal_buffer_size_bytes,// maybe not needed ? + current_batch_metadata: metadata.clone(), // update this before + upload_interval: self.poll_interval, // not sure if needed here + } + }); + + + // Calculate the current batch size, we can probably replace with Sizeable trait + let curr_batch_size_bytes: usize = match &parquet_data { + ParquetTypeStructs::MoveResource(data) => allocative::size_of_unique(data), + ParquetTypeStructs::WriteSetChange(data) => allocative::size_of_unique(data), + ParquetTypeStructs::Transaction(data) => allocative::size_of_unique(data), + ParquetTypeStructs::TableItem(data) => allocative::size_of_unique(data), + ParquetTypeStructs::MoveModule(data) => allocative::size_of_unique(data), + }; + + println!( + "Current batch size for {:?}: {} bytes, buffer size before append: {} bytes", + parquet_type, curr_batch_size_bytes, buffer.buffer_size_bytes + ); + + // If the current buffer size + new batch exceeds max size, upload the buffer + if buffer.buffer_size_bytes + curr_batch_size_bytes > buffer.max_size { + println!( + "Buffer size {} + batch size {} exceeds max size {}. Uploading buffer for {:?}.", + buffer.buffer_size_bytes, + curr_batch_size_bytes, + buffer.max_size, + parquet_type + ); + + // Take the current buffer to upload and reset the buffer in place + let struct_buffer = std::mem::replace( + &mut buffer.buffer, + ParquetTypeStructs::default_for_type(&parquet_type), + ); + self.buffer_uploader.handle_buffer(struct_buffer).await?; + metadata.total_size_in_bytes = buffer.buffer_size_bytes as u64; + buffer.buffer_size_bytes = 0; // Reset buffer size + file_uploaded = true; + } + + // Append new data to the buffer + buffer.buffer_size_bytes += curr_batch_size_bytes; + match (&mut buffer.buffer, parquet_data) { + (ParquetTypeStructs::MoveResource(buf), ParquetTypeStructs::MoveResource(mut data)) => buf.append(&mut data), + (ParquetTypeStructs::WriteSetChange(buf), ParquetTypeStructs::WriteSetChange(mut data)) => buf.append(&mut data), + (ParquetTypeStructs::Transaction(buf), ParquetTypeStructs::Transaction(mut data)) => buf.append(&mut data), + (ParquetTypeStructs::TableItem(buf), ParquetTypeStructs::TableItem(mut data)) => buf.append(&mut data), + (ParquetTypeStructs::MoveModule(buf), ParquetTypeStructs::MoveModule(mut data)) => buf.append(&mut data), + _ => return Err(ProcessorError::ProcessError { + message: format!("Failed to upload buffer"), + }) + }; + + println!( + "Updated buffer size for {:?}: {} bytes", + parquet_type, buffer.buffer_size_bytes + ); + + // If file was uploaded, return metadata update + // TODO: if multiple files were uploaded, we should return mulitple TransactionContext. + if file_uploaded { + println!("File uploaded for {:?}. Returning updated metadata.", parquet_type); + + return Ok(Some(TransactionContext { data: (), metadata })); + } + } + + // Return None if no upload occurred + Ok(None) + } + + async fn cleanup( + &mut self, + ) -> Result>>, ProcessorError> { + Ok(None) + } +} + +impl SizeBufferStep +where + U: Uploadable + Send + 'static + Sync, +{ + pub fn new( + poll_interval: Duration, + buffer_uploader: U + ) -> Self { + Self { + internal_buffers: HashMap::new(), + internal_buffer_size_bytes: 0, + poll_interval, + buffer_uploader, + } + } + +} + + +#[async_trait] +impl PollableAsyncStep for SizeBufferStep + where + U: Uploadable + Send + 'static + Sync, +{ + fn poll_interval(&self) -> Duration { + self.poll_interval + } + + async fn poll( + &mut self, + ) -> Result>>, ProcessorError> { + Ok(None) + } +} + +impl NamedStep for SizeBufferStep +where + U: Uploadable + Send + 'static + Sync, +{ + fn name(&self) -> String { + "DefaultTimedSizeBuffer".to_string() + } +}