Skip to content

Commit

Permalink
add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Nov 18, 2024
1 parent 80684f0 commit 40b32d4
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 42 deletions.
10 changes: 7 additions & 3 deletions rust/sdk-processor/src/steps/common/gcs_uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,18 @@ pub struct GCSUploader {

#[async_trait]
pub trait Uploadable {
async fn handle_buffer(
async fn upload_buffer(
&mut self,
buffer: ParquetTypeStructs,
) -> anyhow::Result<(), ProcessorError>;
}

#[async_trait]
impl Uploadable for GCSUploader {
async fn handle_buffer(
async fn upload_buffer(
&mut self,
buffer: ParquetTypeStructs,
) -> anyhow::Result<(), ProcessorError> {
// Directly call `upload_generic` for each buffer type
let table_name = buffer.get_table_name();

let result = match buffer {
Expand Down Expand Up @@ -115,6 +114,11 @@ impl GCSUploader {
create_new_writer(schema)
}

/// # Context: Why we replace our writer
///
/// Once we’re ready to upload (either because the buffer is full or enough time has passed),
/// we don’t want to keep adding new data to that same writer. we want a clean slate for the next batch.
/// So, we replace the old writer with a new one to empty the writer buffer without losing any data.
fn get_and_replace_writer(
&mut self,
parquet_type: ParquetTypeEnum,
Expand Down
69 changes: 30 additions & 39 deletions rust/sdk-processor/src/steps/common/parquet_buffer_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use async_trait::async_trait;
use mockall::mock;
use std::{collections::HashMap, time::Duration};
use tracing::debug;
use crate::steps::common::gcs_uploader::GCSUploader;


/// `ParquetBuffer` is a struct that holds `ParquetTypeStructs` data
/// and tracks the buffer size in bytes, along with metadata about the data in the buffer.
Expand All @@ -36,6 +38,11 @@ impl ParquetBuffer {
/// This is used to track the end version and timestamp of the batch data in the buffer.
pub fn update_current_batch_metadata(&mut self, cur_batch_metadata: &TransactionMetadata) {
if let Some(buffer_metadata) = &mut self.current_batch_metadata {
if buffer_metadata.end_version != cur_batch_metadata.start_version {
// this shouldn't happen but if it does, we want to know
panic!("End version of the buffer metadata does not match the start version of the current batch metadata");
}

// Update metadata fields with the current batch's end information
buffer_metadata.end_version = cur_batch_metadata.end_version;
buffer_metadata.total_size_in_bytes = cur_batch_metadata.total_size_in_bytes;
Expand All @@ -57,21 +64,17 @@ impl ParquetBuffer {
///
/// # Type Parameters
/// - `U`: A type that implements the `Uploadable` trait, providing the uploading functionality.
pub struct ParquetBufferStep<U>
where
U: Uploadable + Send + 'static + Sync,
pub struct ParquetBufferStep
{
internal_buffers: HashMap<ParquetTypeEnum, ParquetBuffer>,
pub poll_interval: Duration,
pub buffer_uploader: U,
pub buffer_uploader: GCSUploader,
pub buffer_max_size: usize,
}

impl<U> ParquetBufferStep<U>
where
U: Uploadable + Send + 'static + Sync,
impl ParquetBufferStep
{
pub fn new(poll_interval: Duration, buffer_uploader: U, buffer_max_size: usize) -> Self {
pub fn new(poll_interval: Duration, buffer_uploader: GCSUploader, buffer_max_size: usize) -> Self {
Self {
internal_buffers: HashMap::new(),
poll_interval,
Expand All @@ -93,7 +96,7 @@ where
/// We check the size of the buffer + the size of the incoming data before appending it.
/// If the sum of the two exceeds the maximum limit size, it uploads the buffer content to GCS to avoid
/// spliting the batch data, allowing for more efficient and simpler version tracking.
async fn handle_buffer_append(
async fn upload_buffer_append(
&mut self,
parquet_type: ParquetTypeEnum,
parquet_data: ParquetTypeStructs,
Expand Down Expand Up @@ -133,28 +136,20 @@ where
&mut buffer.buffer,
ParquetTypeStructs::default_for_type(&parquet_type),
);
self.buffer_uploader.handle_buffer(struct_buffer).await?;
self.buffer_uploader.upload_buffer(struct_buffer).await?;
file_uploaded = true;

// update this metadata before insert
upload_metadata_map
.insert(parquet_type, buffer.current_batch_metadata.clone().unwrap());
buffer.buffer_size_bytes = 0;
buffer.current_batch_metadata = None;
}

// Append new data to the buffer
Self::append_to_buffer(buffer, parquet_data)?;
buffer.update_current_batch_metadata(cur_batch_metadata);

// if it wasn't uploaded -> we update only end_version, size, and last timestamp
if file_uploaded {
if let Some(buffer_metadata) = &mut buffer.current_batch_metadata {
buffer_metadata.start_version = cur_batch_metadata.start_version;
buffer_metadata.start_transaction_timestamp =
cur_batch_metadata.start_transaction_timestamp.clone();
}
}

debug!(
"Updated buffer size for {:?}: {} bytes",
parquet_type, buffer.buffer_size_bytes,
Expand All @@ -164,9 +159,7 @@ where
}

#[async_trait]
impl<U> Processable for ParquetBufferStep<U>
where
U: Uploadable + Send + 'static + Sync,
impl Processable for ParquetBufferStep
{
type Input = HashMap<ParquetTypeEnum, ParquetTypeStructs>;
type Output = HashMap<ParquetTypeEnum, TransactionMetadata>;
Expand All @@ -184,7 +177,7 @@ where

let mut upload_metadata_map = HashMap::new();
for (parquet_type, parquet_data) in item.data {
self.handle_buffer_append(
self.upload_buffer_append(
parquet_type,
parquet_data,
&item.metadata,
Expand Down Expand Up @@ -214,11 +207,14 @@ where
ParquetTypeStructs::default_for_type(&parquet_type),
);

self.buffer_uploader.handle_buffer(struct_buffer).await?;
self.buffer_uploader.upload_buffer(struct_buffer).await?;

if let Some(buffer_metadata) = &mut buffer.current_batch_metadata {
buffer_metadata.total_size_in_bytes = buffer.buffer_size_bytes as u64;
metadata_map.insert(parquet_type, buffer_metadata.clone());
} else {
// This should never happen
panic!("Buffer metadata is missing for ParquetTypeEnum: {:?}", parquet_type);
}
}
}
Expand All @@ -236,9 +232,7 @@ where
}

#[async_trait]
impl<U> PollableAsyncStep for ParquetBufferStep<U>
where
U: Uploadable + Send + 'static + Sync,
impl PollableAsyncStep for ParquetBufferStep
{
fn poll_interval(&self) -> Duration {
self.poll_interval
Expand All @@ -259,7 +253,7 @@ where
ParquetTypeStructs::default_for_type(&parquet_type),
);

self.buffer_uploader.handle_buffer(struct_buffer).await?;
self.buffer_uploader.upload_buffer(struct_buffer).await?;

let metadata = buffer.current_batch_metadata.clone().unwrap();
metadata_map.insert(parquet_type, metadata);
Expand All @@ -279,9 +273,7 @@ where
}
}

impl<U> NamedStep for ParquetBufferStep<U>
where
U: Uploadable + Send + 'static + Sync,
impl NamedStep for ParquetBufferStep
{
fn name(&self) -> String {
"ParquetBufferStep".to_string()
Expand All @@ -292,7 +284,7 @@ mock! {
pub Uploadable {}
#[async_trait]
impl Uploadable for Uploadable {
async fn handle_buffer(&mut self, buffer: ParquetTypeStructs) -> Result<(), ProcessorError>;
async fn upload_buffer(&mut self, buffer: ParquetTypeStructs) -> Result<(), ProcessorError>;
}
}

Expand All @@ -319,7 +311,6 @@ mod tests {
let mut parquet_step =
ParquetBufferStep::new(poll_interval, mock_uploader, buffer_max_size);

// Test the `process` method with data below `buffer_max_size`
let data = HashMap::from([(
ParquetTypeEnum::MoveResource,
ParquetTypeStructs::default_for_type(&ParquetTypeEnum::MoveResource),
Expand All @@ -338,22 +329,22 @@ mod tests {

#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_parquet_buffer_step() {
async fn test_parquet_buffer_step_trigger_upload() {
let poll_interval = Duration::from_secs(10);
let buffer_max_size = 25; // default parquetTYpeStruct for move resource is 24 bytes
let buffer_max_size = 25; // default ParquetTypeStructs for move resource is 24 bytes
use crate::steps::common::parquet_buffer_step::ParquetTypeEnum;

let mut mock_uploader = MockUploadable::new();

// Set up expectations for the mock uploader
mock_uploader
.expect_handle_buffer()
.expect_upload_buffer()
.times(1)
.returning(|_| Ok(()));

let mut parquet_step =
ParquetBufferStep::new(poll_interval, mock_uploader, buffer_max_size);
// Test the `process` method with data below `buffer_max_size`

// Test the `process` method with data below `buffer_max_size` yet
let data = HashMap::from([(
ParquetTypeEnum::MoveResource,
ParquetTypeStructs::default_for_type(&ParquetTypeEnum::MoveResource),
Expand All @@ -369,7 +360,7 @@ mod tests {
"Expected no upload for data below buffer_max_size"
);

// Test the `process` method with data below `buffer_max_size`
// Test the `process` method with buffer + data > `buffer_max_size`
let data = HashMap::from([(
ParquetTypeEnum::MoveResource,
ParquetTypeStructs::default_for_type(&ParquetTypeEnum::MoveResource),
Expand Down

0 comments on commit 40b32d4

Please sign in to comment.