Skip to content

Commit

Permalink
add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Nov 18, 2024
1 parent b2497f9 commit 6c2a18e
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 75 deletions.
10 changes: 7 additions & 3 deletions rust/sdk-processor/src/steps/common/gcs_uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,18 @@ pub struct GCSUploader {

#[async_trait]
pub trait Uploadable {
async fn handle_buffer(
async fn upload_buffer(
&mut self,
buffer: ParquetTypeStructs,
) -> anyhow::Result<(), ProcessorError>;
}

#[async_trait]
impl Uploadable for GCSUploader {
async fn handle_buffer(
async fn upload_buffer(
&mut self,
buffer: ParquetTypeStructs,
) -> anyhow::Result<(), ProcessorError> {
// Directly call `upload_generic` for each buffer type
let table_name = buffer.get_table_name();

let result = match buffer {
Expand Down Expand Up @@ -115,6 +114,11 @@ impl GCSUploader {
create_new_writer(schema)
}

/// # Context: Why we replace our writer
///
/// Once we’re ready to upload (either because the buffer is full or enough time has passed),
/// we don’t want to keep adding new data to that same writer. we want a clean slate for the next batch.
/// So, we replace the old writer with a new one to empty the writer buffer without losing any data.
fn get_and_replace_writer(
&mut self,
parquet_type: ParquetTypeEnum,
Expand Down
181 changes: 109 additions & 72 deletions rust/sdk-processor/src/steps/common/parquet_buffer_step.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[allow(unused_imports)]
use crate::{
parquet_processors::{ParquetTypeEnum, ParquetTypeStructs},
steps::common::gcs_uploader::Uploadable,
steps::common::gcs_uploader::{GCSUploader, Uploadable},
};
use anyhow::Result;
use aptos_indexer_processor_sdk::{
Expand All @@ -11,7 +12,6 @@ use aptos_indexer_processor_sdk::{
utils::errors::ProcessorError,
};
use async_trait::async_trait;
use mockall::mock;
use std::{collections::HashMap, time::Duration};
use tracing::debug;

Expand All @@ -36,6 +36,11 @@ impl ParquetBuffer {
/// This is used to track the end version and timestamp of the batch data in the buffer.
pub fn update_current_batch_metadata(&mut self, cur_batch_metadata: &TransactionMetadata) {
if let Some(buffer_metadata) = &mut self.current_batch_metadata {
if buffer_metadata.end_version != cur_batch_metadata.start_version {
// this shouldn't happen but if it does, we want to know
panic!("End version of the buffer metadata does not match the start version of the current batch metadata");
}

// Update metadata fields with the current batch's end information
buffer_metadata.end_version = cur_batch_metadata.end_version;
buffer_metadata.total_size_in_bytes = cur_batch_metadata.total_size_in_bytes;
Expand All @@ -57,21 +62,19 @@ impl ParquetBuffer {
///
/// # Type Parameters
/// - `U`: A type that implements the `Uploadable` trait, providing the uploading functionality.
pub struct ParquetBufferStep<U>
where
U: Uploadable + Send + 'static + Sync,
{
pub struct ParquetBufferStep {
internal_buffers: HashMap<ParquetTypeEnum, ParquetBuffer>,
pub poll_interval: Duration,
pub buffer_uploader: U,
pub buffer_uploader: GCSUploader,
pub buffer_max_size: usize,
}

impl<U> ParquetBufferStep<U>
where
U: Uploadable + Send + 'static + Sync,
{
pub fn new(poll_interval: Duration, buffer_uploader: U, buffer_max_size: usize) -> Self {
impl ParquetBufferStep {
pub fn new(
poll_interval: Duration,
buffer_uploader: GCSUploader,
buffer_max_size: usize,
) -> Self {
Self {
internal_buffers: HashMap::new(),
poll_interval,
Expand All @@ -93,15 +96,13 @@ where
/// We check the size of the buffer + the size of the incoming data before appending it.
/// If the sum of the two exceeds the maximum limit size, it uploads the buffer content to GCS to avoid
/// spliting the batch data, allowing for more efficient and simpler version tracking.
async fn handle_buffer_append(
async fn upload_buffer_append(
&mut self,
parquet_type: ParquetTypeEnum,
parquet_data: ParquetTypeStructs,
cur_batch_metadata: &TransactionMetadata,
upload_metadata_map: &mut HashMap<ParquetTypeEnum, TransactionMetadata>,
) -> Result<(), ProcessorError> {
let mut file_uploaded = false;

// Get or initialize the buffer for the specific ParquetTypeEnum
let buffer = self
.internal_buffers
Expand Down Expand Up @@ -133,28 +134,19 @@ where
&mut buffer.buffer,
ParquetTypeStructs::default_for_type(&parquet_type),
);
self.buffer_uploader.handle_buffer(struct_buffer).await?;
file_uploaded = true;
self.buffer_uploader.upload_buffer(struct_buffer).await?;

// update this metadata before insert
upload_metadata_map
.insert(parquet_type, buffer.current_batch_metadata.clone().unwrap());
buffer.buffer_size_bytes = 0;
buffer.current_batch_metadata = None;
}

// Append new data to the buffer
Self::append_to_buffer(buffer, parquet_data)?;
buffer.update_current_batch_metadata(cur_batch_metadata);

// if it wasn't uploaded -> we update only end_version, size, and last timestamp
if file_uploaded {
if let Some(buffer_metadata) = &mut buffer.current_batch_metadata {
buffer_metadata.start_version = cur_batch_metadata.start_version;
buffer_metadata.start_transaction_timestamp =
cur_batch_metadata.start_transaction_timestamp.clone();
}
}

debug!(
"Updated buffer size for {:?}: {} bytes",
parquet_type, buffer.buffer_size_bytes,
Expand All @@ -164,10 +156,7 @@ where
}

#[async_trait]
impl<U> Processable for ParquetBufferStep<U>
where
U: Uploadable + Send + 'static + Sync,
{
impl Processable for ParquetBufferStep {
type Input = HashMap<ParquetTypeEnum, ParquetTypeStructs>;
type Output = HashMap<ParquetTypeEnum, TransactionMetadata>;
type RunType = PollableAsyncRunType;
Expand All @@ -184,7 +173,7 @@ where

let mut upload_metadata_map = HashMap::new();
for (parquet_type, parquet_data) in item.data {
self.handle_buffer_append(
self.upload_buffer_append(
parquet_type,
parquet_data,
&item.metadata,
Expand Down Expand Up @@ -214,11 +203,17 @@ where
ParquetTypeStructs::default_for_type(&parquet_type),
);

self.buffer_uploader.handle_buffer(struct_buffer).await?;
self.buffer_uploader.upload_buffer(struct_buffer).await?;

if let Some(buffer_metadata) = &mut buffer.current_batch_metadata {
buffer_metadata.total_size_in_bytes = buffer.buffer_size_bytes as u64;
metadata_map.insert(parquet_type, buffer_metadata.clone());
} else {
// This should never happen
panic!(
"Buffer metadata is missing for ParquetTypeEnum: {:?}",
parquet_type
);
}
}
}
Expand All @@ -236,10 +231,7 @@ where
}

#[async_trait]
impl<U> PollableAsyncStep for ParquetBufferStep<U>
where
U: Uploadable + Send + 'static + Sync,
{
impl PollableAsyncStep for ParquetBufferStep {
fn poll_interval(&self) -> Duration {
self.poll_interval
}
Expand All @@ -259,7 +251,7 @@ where
ParquetTypeStructs::default_for_type(&parquet_type),
);

self.buffer_uploader.handle_buffer(struct_buffer).await?;
self.buffer_uploader.upload_buffer(struct_buffer).await?;

let metadata = buffer.current_batch_metadata.clone().unwrap();
metadata_map.insert(parquet_type, metadata);
Expand All @@ -279,47 +271,48 @@ where
}
}

impl<U> NamedStep for ParquetBufferStep<U>
where
U: Uploadable + Send + 'static + Sync,
{
impl NamedStep for ParquetBufferStep {
fn name(&self) -> String {
"ParquetBufferStep".to_string()
}
}

mock! {
pub Uploadable {}
#[async_trait]
impl Uploadable for Uploadable {
async fn handle_buffer(&mut self, buffer: ParquetTypeStructs) -> Result<(), ProcessorError>;
}
}

#[cfg(test)]
mod tests {
use crate::steps::common::parquet_buffer_step::{
MockUploadable, ParquetBufferStep, ParquetTypeStructs,
use crate::{
config::processor_config::ParquetDefaultProcessorConfig,
steps::common::{
gcs_uploader::{create_new_writer, GCSUploader},
parquet_buffer_step::{ParquetBufferStep, ParquetTypeEnum, ParquetTypeStructs},
},
};
use aptos_indexer_processor_sdk::{
traits::Processable,
types::transaction_context::{TransactionContext, TransactionMetadata},
};
use std::{collections::HashMap, time::Duration};
use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig};
use parquet::schema::types::Type;
use processor::{
bq_analytics::generic_parquet_processor::HasParquetSchema,
db::parquet::models::default_models::parquet_move_resources::MoveResource,
};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};

#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_parquet_buffer_step_no_upload() {
async fn test_parquet_buffer_step_no_upload() -> anyhow::Result<()> {
let poll_interval = Duration::from_secs(10);
let buffer_max_size = 100;
use crate::steps::common::parquet_buffer_step::ParquetTypeEnum;

let mock_uploader = MockUploadable::new();
let parquet_processor_config = create_parquet_processor_config();

let buffer_uploader = create_parquet_uploader(&parquet_processor_config).await?;
let mut parquet_step =
ParquetBufferStep::new(poll_interval, mock_uploader, buffer_max_size);
ParquetBufferStep::new(poll_interval, buffer_uploader, buffer_max_size);

// Test the `process` method with data below `buffer_max_size`
let data = HashMap::from([(
ParquetTypeEnum::MoveResource,
ParquetTypeStructs::default_for_type(&ParquetTypeEnum::MoveResource),
Expand All @@ -330,30 +323,27 @@ mod tests {
.process(TransactionContext { data, metadata })
.await
.unwrap();

assert!(
result.is_none(),
"Expected no upload for data below buffer_max_size"
);

Ok(())
}

#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_parquet_buffer_step() {
async fn test_parquet_buffer_step_trigger_upload() -> anyhow::Result<()> {
let poll_interval = Duration::from_secs(10);
let buffer_max_size = 25; // default parquetTYpeStruct for move resource is 24 bytes
use crate::steps::common::parquet_buffer_step::ParquetTypeEnum;

let mut mock_uploader = MockUploadable::new();

// Set up expectations for the mock uploader
mock_uploader
.expect_handle_buffer()
.times(1)
.returning(|_| Ok(()));
let buffer_max_size = 25; // Default ParquetTypeStructs for MoveResource is 24 bytes
let parquet_processor_config = create_parquet_processor_config();

let buffer_uploader = create_parquet_uploader(&parquet_processor_config).await?;
let mut parquet_step =
ParquetBufferStep::new(poll_interval, mock_uploader, buffer_max_size);
// Test the `process` method with data below `buffer_max_size`
ParquetBufferStep::new(poll_interval, buffer_uploader, buffer_max_size);

// Test data below `buffer_max_size`
let data = HashMap::from([(
ParquetTypeEnum::MoveResource,
ParquetTypeStructs::default_for_type(&ParquetTypeEnum::MoveResource),
Expand All @@ -369,7 +359,7 @@ mod tests {
"Expected no upload for data below buffer_max_size"
);

// Test the `process` method with data below `buffer_max_size`
// Test buffer + data > `buffer_max_size`
let data = HashMap::from([(
ParquetTypeEnum::MoveResource,
ParquetTypeStructs::default_for_type(&ParquetTypeEnum::MoveResource),
Expand All @@ -380,9 +370,56 @@ mod tests {
.process(TransactionContext { data, metadata })
.await
.unwrap();

assert!(
result.is_some(),
"Expected upload when data exceeds buffer_max_size"
);

Ok(())
}

async fn create_parquet_uploader(
parquet_processor_config: &ParquetDefaultProcessorConfig,
) -> anyhow::Result<GCSUploader> {
let gcs_config = GcsClientConfig::default()
.with_auth()
.await
.expect("Failed to create GCS client config");
let gcs_client = Arc::new(GCSClient::new(gcs_config));

let parquet_type_to_schemas: HashMap<ParquetTypeEnum, Arc<Type>> =
[(ParquetTypeEnum::MoveResource, MoveResource::schema())]
.into_iter()
.collect();

let parquet_type_to_writer = parquet_type_to_schemas
.iter()
.map(|(key, schema)| {
let writer = create_new_writer(schema.clone()).expect("Failed to create writer");
(*key, writer)
})
.collect();

GCSUploader::new(
gcs_client,
parquet_type_to_schemas,
parquet_type_to_writer,
parquet_processor_config.bucket_name.clone(),
parquet_processor_config.bucket_root.clone(),
"processor_name".to_string(),
)
}

fn create_parquet_processor_config() -> ParquetDefaultProcessorConfig {
ParquetDefaultProcessorConfig {
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
parquet_upload_interval: 180,
max_buffer_size: 100,
channel_size: 100,
google_application_credentials: None,
tables: HashSet::new(),
}
}
}

0 comments on commit 6c2a18e

Please sign in to comment.