Skip to content

Commit

Permalink
consolidated step for all
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Nov 12, 2024
1 parent 97443c8 commit 99b41a8
Show file tree
Hide file tree
Showing 5 changed files with 454 additions and 208 deletions.
195 changes: 32 additions & 163 deletions rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ use processor::db::common::models::default_models::{
};
use std::{sync::Arc, time::Duration};
use tracing::{debug, info};
use std::collections::HashMap;
use crate::parquet_processors::ParquetTypeEnum;
use parquet::schema::types::Type;
use crate::steps::parquet_default_processor::gcs_handler::create_new_writer;
use crate::steps::parquet_default_processor::gcs_handler::GCSUploader;
use processor::bq_analytics::generic_parquet_processor::HasParquetSchema;
use crate::steps::parquet_default_processor::size_buffer::SizeBufferStep;


const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";

Expand Down Expand Up @@ -164,181 +172,42 @@ impl ProcessorTrait for ParquetDefaultProcessor {

let gcs_client = Arc::new(GCSClient::new(gcs_config));

let generic_parquet_buffer_handler = GenericParquetBufferHandler::new(
parquet_processor_config.bucket_name.clone(),
parquet_processor_config.bucket_root.clone(),
[ParquetTransaction::default()].as_slice().schema().unwrap(),
self.name().to_string(),
)
.unwrap();

let transaction_step = TimedSizeBufferStep::<
Input,
ParquetTransaction,
GenericParquetBufferHandler<ParquetTransaction>,
>::new(
Duration::from_secs(parquet_processor_config.parquet_upload_interval),
TableConfig {
table_name: "transactions".to_string(),
bucket_name: parquet_processor_config.bucket_name.clone(),
bucket_root: parquet_processor_config.bucket_root.clone(),
max_size: parquet_processor_config.max_buffer_size,
},
gcs_client.clone(),
self.name(),
generic_parquet_buffer_handler,
);
let parquet_type_to_schemas: HashMap<ParquetTypeEnum, Arc<Type>> = [
(ParquetTypeEnum::MoveResource, MoveResource::schema()),
(ParquetTypeEnum::WriteSetChange, WriteSetChangeModel::schema()),
(ParquetTypeEnum::Transaction, ParquetTransaction::schema()),
(ParquetTypeEnum::TableItem, TableItem::schema()),
(ParquetTypeEnum::MoveModule, MoveModule::schema()),
].into_iter().collect();

let move_resource_generic_parquet_buffer_handler = GenericParquetBufferHandler::new(
parquet_processor_config.bucket_name.clone(),
parquet_processor_config.bucket_root.clone(),
[MoveResource::default()].as_slice().schema().unwrap(),
self.name().to_string(),
)
.unwrap();
let parquet_type_to_writer = parquet_type_to_schemas.iter().map(|(key, schema)| {
let writer = create_new_writer(schema.clone()).expect("Failed to create writer");
(*key, writer)
}).collect();

let move_resource_step = TimedSizeBufferStep::<
Input,
MoveResource,
GenericParquetBufferHandler<MoveResource>,
>::new(
Duration::from_secs(parquet_processor_config.parquet_upload_interval),
TableConfig {
table_name: "move_resources".to_string(),
bucket_name: parquet_processor_config.bucket_name.clone(),
bucket_root: parquet_processor_config.bucket_root.clone(),
max_size: parquet_processor_config.max_buffer_size,
},
let buffer_uploader = GCSUploader::new(
gcs_client.clone(),
self.name(),
move_resource_generic_parquet_buffer_handler,
);

let wsc_generic_parquet_buffer_handler = GenericParquetBufferHandler::new(
parquet_type_to_schemas,
parquet_type_to_writer,
parquet_processor_config.bucket_name.clone(),
parquet_processor_config.bucket_root.clone(),
[WriteSetChangeModel::default()]
.as_slice()
.schema()
.unwrap(),
self.name().to_string(),
)
.unwrap();

let write_set_change_step = TimedSizeBufferStep::<
Input,
WriteSetChangeModel,
GenericParquetBufferHandler<WriteSetChangeModel>,
>::new(
Duration::from_secs(parquet_processor_config.parquet_upload_interval),
TableConfig {
table_name: "write_set_changes".to_string(),
bucket_name: parquet_processor_config.bucket_name.clone(),
bucket_root: parquet_processor_config.bucket_root.clone(),
max_size: parquet_processor_config.max_buffer_size,
},
gcs_client.clone(),
self.name(),
wsc_generic_parquet_buffer_handler,
);

let table_items_generic_parquet_buffer_handler = GenericParquetBufferHandler::new(
parquet_processor_config.bucket_name.clone(),
parquet_processor_config.bucket_root.clone(),
[TableItem::default()].as_slice().schema().unwrap(),
self.name().to_string(),
)
.unwrap();

let table_item_step =
TimedSizeBufferStep::<Input, TableItem, GenericParquetBufferHandler<TableItem>>::new(
Duration::from_secs(parquet_processor_config.parquet_upload_interval),
TableConfig {
table_name: "table_items".to_string(),
bucket_name: parquet_processor_config.bucket_name.clone(),
bucket_root: parquet_processor_config.bucket_root.clone(),
max_size: parquet_processor_config.max_buffer_size,
},
gcs_client.clone(),
self.name(),
table_items_generic_parquet_buffer_handler,
);

let move_module_generic_parquet_buffer_handler = GenericParquetBufferHandler::new(
parquet_processor_config.bucket_name.clone(),
parquet_processor_config.bucket_root.clone(),
[MoveModule::default()].as_slice().schema().unwrap(),
self.name().to_string(),
)
.unwrap();

let move_module_step =
TimedSizeBufferStep::<Input, MoveModule, GenericParquetBufferHandler<MoveModule>>::new(
Duration::from_secs(parquet_processor_config.parquet_upload_interval),
TableConfig {
table_name: "move_modules".to_string(),
bucket_name: parquet_processor_config.bucket_name.clone(),
bucket_root: parquet_processor_config.bucket_root.clone(),
max_size: parquet_processor_config.max_buffer_size,
},
gcs_client.clone(),
self.name(),
move_module_generic_parquet_buffer_handler,
);
)?;

let channel_size = parquet_processor_config.channel_size;

let builder = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
let default_size_buffer_step = SizeBufferStep::new(
Duration::from_secs(parquet_processor_config.parquet_upload_interval),
buffer_uploader
);

let builder =
builder.connect_to(parquet_default_extractor.into_runnable_step(), channel_size);
let mut fanout_builder = builder.fanout_broadcast(5);

let (first_builder, first_output_receiver) = fanout_builder
.get_processor_builder()
.unwrap()
.connect_to(transaction_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

let (second_builder, second_output_receiver) = fanout_builder
.get_processor_builder()
.unwrap()
.connect_to(move_resource_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

let (third_builder, third_output_receiver) = fanout_builder
.get_processor_builder()
.unwrap()
.connect_to(write_set_change_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

let (fourth_builder, fourth_output_receiver) = fanout_builder
.get_processor_builder()
.unwrap()
.connect_to(table_item_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

let (fifth_builder, fifth_output_receiver) = fanout_builder
.get_processor_builder()
.unwrap()
.connect_to(move_module_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

let (_, buffer_receiver) = ProcessorBuilder::new_with_fanin_step_with_receivers(
vec![
(first_output_receiver, first_builder.graph),
(second_output_receiver, second_builder.graph),
(third_output_receiver, third_builder.graph),
(fourth_output_receiver, fourth_builder.graph),
(fifth_output_receiver, fifth_builder.graph),
],
// TODO: Replace with parquet version tracker
RunnableAsyncStep::new(PassThroughStep::new_named("FaninStep".to_string())),
channel_size,
// Connect processor steps together
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
.end_and_return_output_receiver(channel_size);
.connect_to(parquet_default_extractor.into_runnable_step(), channel_size)
.connect_to(default_size_buffer_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

// (Optional) Parse the results
loop {
Expand Down
164 changes: 164 additions & 0 deletions rust/sdk-processor/src/steps/parquet_default_processor/gcs_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
use allocative::Allocative;
use anyhow::Context;
use anyhow::anyhow;
use aptos_indexer_processor_sdk::{
common_steps::timed_size_buffer_step::BufferHandler,
traits::parquet_extract_trait::{GetTimeStamp, HasParquetSchema, HasVersion, NamedTable},
types::transaction_context::TransactionMetadata,
utils::errors::ProcessorError,
};
use async_trait::async_trait;
use google_cloud_storage::client::Client as GCSClient;
use parquet::{
file::{properties::WriterProperties, writer::SerializedFileWriter},
record::RecordWriter,
schema::types::Type,
};
use processor::{
bq_analytics::gcs_handler::upload_parquet_to_gcs, utils::util::naive_datetime_to_timestamp,
};
use std::{marker::PhantomData, path::PathBuf, sync::Arc};
use tracing::{debug, error};
use crate::parquet_processors::{ParquetTypeEnum, ParquetTypeStructs};
use std::collections::HashMap;

pub struct GCSUploader
{
gcs_client: Arc<GCSClient>,
parquet_type_to_schemas: HashMap<ParquetTypeEnum, Arc<Type>>,
parquet_type_to_writer: HashMap<ParquetTypeEnum, SerializedFileWriter<Vec<u8>>>,
pub bucket_name: String,
pub bucket_root: String,
pub processor_name: String,
}


#[async_trait]
pub trait Uploadable {
async fn handle_buffer(
&mut self,
buffer: ParquetTypeStructs,
) -> anyhow::Result<(), ProcessorError>;
}


#[async_trait]
impl Uploadable for GCSUploader {
async fn handle_buffer(
&mut self,
buffer: ParquetTypeStructs,
) -> anyhow::Result<(), ProcessorError> {
if let Err(e) = self.upload_buffer(buffer).await {
error!("Failed to upload buffer: {}", e);
return Err(ProcessorError::ProcessError {
message: format!("Failed to upload buffer: {}", e),
});
}
Ok(())
}
}

pub fn create_new_writer(schema: Arc<Type>) -> anyhow::Result<SerializedFileWriter<Vec<u8>>> {
let props = WriterProperties::builder()
.set_compression(parquet::basic::Compression::LZ4)
.build();
let props_arc = Arc::new(props);

SerializedFileWriter::new(Vec::new(), schema, props_arc).context("Failed to create new writer")
}

impl GCSUploader {
fn create_new_writer(&self, parquet_type: ParquetTypeEnum) -> anyhow::Result<SerializedFileWriter<Vec<u8>>> {
let schema = self.parquet_type_to_schemas.get(&parquet_type).map_or_else(|| Err(anyhow!("Parquet type not found in schemas")), |schema| create_new_writer(schema.clone())).unwrap();
Ok(schema)
// processor::bq_analytics::generic_parquet_processor::create_new_writer(schema);
}

// fn close_writer(&mut self) -> anyhow::Result<SerializedFileWriter<Vec<u8>>> {
//
// // let new_writer = self.create_new_writer()?;
// // let old_writer = std::mem::replace(&mut self.writer, new_writer);
// // Ok(old_writer)
// }

pub fn new(
gcs_client: Arc<GCSClient>,
parquet_type_to_schemas: HashMap<ParquetTypeEnum, Arc<Type>>,
parquet_type_to_writer: HashMap<ParquetTypeEnum, SerializedFileWriter<Vec<u8>>>,
bucket_name: String,
bucket_root: String,
processor_name: String,
) -> anyhow::Result<Self> {
Ok(Self {
gcs_client,
parquet_type_to_schemas,
parquet_type_to_writer,
bucket_name,
bucket_root,
processor_name,
})
}

async fn upload_buffer(
&mut self,
buffer: ParquetTypeStructs,
) -> anyhow::Result<()> {

// let buffer = match buffer {
// ParquetTypeStructs::TransactionStructs(buffer) => buffer,
// _ => return Err(anyhow!("Buffer is not a transaction struct")),
// };
// if buffer.is_empty() {
// debug!("Buffer is empty, skipping upload.");
// return Ok(());
// }
// let struts = buffer.get_structs();
//
// let first = struts
// .first()
// .context("Buffer is not empty but has no first element")?;
// let first_transaction_timestamp = naive_datetime_to_timestamp(first.get_timestamp());
// let start_version = first.version();
// let last = buffer
// .last()
// .context("Buffer is not empty but has no last element")?;
// let end_version = last.version();
// let last_transaction_timestamp = naive_datetime_to_timestamp(last.get_timestamp());
// let mut row_group_writer = self
// .writer
// .next_row_group()
// .context("Failed to get row group")?;
//
// buffer
// .as_slice()
// .write_to_row_group(&mut row_group_writer)
// .context("Failed to write to row group")?;
// row_group_writer
// .close()
// .context("Failed to close row group")?;
//
// let old_writer = self.close_writer().context("Failed to close writer")?;
// let upload_buffer = old_writer
// .into_inner()
// .context("Failed to get inner buffer")?;
//
// let bucket_root = PathBuf::from(&self.bucket_root);
//
// upload_parquet_to_gcs(
// &gcs_client,
// upload_buffer,
// ParquetType::TABLE_NAME,
// &self.bucket_name,
// &bucket_root,
// self.processor_name.clone(),
// )
// .await?;
//
// metadata.start_version = start_version as u64;
// metadata.end_version = end_version as u64;
// metadata.start_transaction_timestamp = Some(first_transaction_timestamp);
// metadata.end_transaction_timestamp = Some(last_transaction_timestamp);

Ok(())
}
}
2 changes: 2 additions & 0 deletions rust/sdk-processor/src/steps/parquet_default_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod generic_parquet_buffer_handler;
pub mod parquet_default_extractor;
pub mod size_buffer;
pub mod gcs_handler;
// pub mod timed_size_buffer;
Loading

0 comments on commit 99b41a8

Please sign in to comment.