Skip to content

Commit

Permalink
replace enums with struct with dyn trait to address perofrmance risk …
Browse files Browse the repository at this point in the history
…with having too many enums
  • Loading branch information
yuunlimm committed Nov 22, 2024
1 parent 3d22e3b commit 5b9608c
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 116 deletions.
7 changes: 7 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,4 @@ serde_canonical_json = "1.0.0"
allocative = "0.3.3"
allocative_derive = "0.3.3"
mockall = "0.12.1"
downcast-rs = "1.2.1"
1 change: 1 addition & 0 deletions rust/sdk-processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ clap = { workspace = true }
diesel = { workspace = true }
diesel-async = { workspace = true }
diesel_migrations = { workspace = true }
downcast-rs = { workspace = true }
field_count = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
Expand Down
241 changes: 173 additions & 68 deletions rust/sdk-processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::{
utils::database::{new_db_pool, ArcDbPool},
};
use aptos_indexer_processor_sdk::utils::errors::ProcessorError;
use async_trait::async_trait;
use downcast_rs::{impl_downcast, Downcast};
use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig};
use parquet::schema::types::Type;
use processor::{
Expand All @@ -17,6 +19,7 @@ use processor::{
},
worker::TableFlags,
};
#[allow(unused_imports)]
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
Expand All @@ -29,6 +32,33 @@ pub mod parquet_default_processor;

const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";

/// Trait for providing default instances of supported types.
pub trait ParquetDefault {
fn default_for_type() -> Self;
}

/// Macro to implement ParquetDefault for multiple types.
macro_rules! impl_parquet_default {
($($type:ty),*) => {
$(
impl ParquetDefault for Vec<$type> {
fn default_for_type() -> Self {
Vec::new()
}
}
)*
};
}

// Apply the macro to all supported types
impl_parquet_default!(
MoveResource,
WriteSetChangeModel,
ParquetTransaction,
TableItem,
MoveModule
);

/// Enum representing the different types of Parquet files that can be processed.
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Display, EnumIter)]
#[strum(serialize_all = "snake_case")]
Expand Down Expand Up @@ -56,84 +86,136 @@ pub enum ParquetTypeEnum {
MoveModule,
}

#[derive(Clone, Debug, strum::EnumDiscriminants)]
#[strum(serialize_all = "snake_case")]
#[strum_discriminants(
derive(
Deserialize,
Serialize,
strum::EnumVariantNames,
strum::IntoStaticStr,
strum::Display,
clap::ValueEnum
),
name(ParquetTypeStructName),
clap(rename_all = "snake_case"),
serde(rename_all = "snake_case"),
strum(serialize_all = "snake_case")
)]
pub enum ParquetTypeStructs {
MoveResource(Vec<MoveResource>),
WriteSetChange(Vec<WriteSetChangeModel>),
Transaction(Vec<ParquetTransaction>),
TableItem(Vec<TableItem>),
MoveModule(Vec<MoveModule>),
/// Trait for handling various Parquet types.
#[async_trait]
pub trait ParquetTypeTrait: std::fmt::Debug + Send + Sync + Downcast {
fn parquet_type(&self) -> ParquetTypeEnum;
fn calculate_size(&self) -> usize;

fn append(&mut self, other: Box<dyn ParquetTypeTrait>) -> Result<(), ProcessorError>;

async fn upload_to_gcs(
&self,
uploader: &mut GCSUploader,
parquet_type: ParquetTypeEnum,
table_name: &str,
) -> anyhow::Result<()>;

fn clone_box(&self) -> Box<dyn ParquetTypeTrait>;
}

impl_downcast!(ParquetTypeTrait);

/// Struct for handling Parquet types dynamically.
#[derive(Debug)]
pub struct ParquetTypeStructs {
pub data: Box<dyn ParquetTypeTrait>,
}

impl ParquetTypeStructs {
pub fn default_for_type(parquet_type: &ParquetTypeEnum) -> Self {
match parquet_type {
ParquetTypeEnum::MoveResource => ParquetTypeStructs::MoveResource(Vec::new()),
ParquetTypeEnum::WriteSetChange => ParquetTypeStructs::WriteSetChange(Vec::new()),
ParquetTypeEnum::Transaction => ParquetTypeStructs::Transaction(Vec::new()),
ParquetTypeEnum::TableItem => ParquetTypeStructs::TableItem(Vec::new()),
ParquetTypeEnum::MoveModule => ParquetTypeStructs::MoveModule(Vec::new()),
pub fn new<T: ParquetTypeTrait + 'static>(data: T) -> Self {
Self {
data: Box::new(data),
}
}

pub fn calculate_size(&self) -> usize {
match self {
ParquetTypeStructs::MoveResource(data) => allocative::size_of_unique(data),
ParquetTypeStructs::WriteSetChange(data) => allocative::size_of_unique(data),
ParquetTypeStructs::Transaction(data) => allocative::size_of_unique(data),
ParquetTypeStructs::TableItem(data) => allocative::size_of_unique(data),
ParquetTypeStructs::MoveModule(data) => allocative::size_of_unique(data),
}
self.data.calculate_size()
}

/// Appends data to the current buffer within each ParquetTypeStructs variant.
pub fn append(&mut self, other: ParquetTypeStructs) -> Result<(), ProcessorError> {
match (self, other) {
(ParquetTypeStructs::MoveResource(buf), ParquetTypeStructs::MoveResource(mut data)) => {
buf.append(&mut data);
Ok(())
self.data.append(other.data)
}

pub async fn upload_to_gcs(
&self,
uploader: &mut GCSUploader,
parquet_type: ParquetTypeEnum,
table_name: &str,
) -> anyhow::Result<()> {
self.data
.upload_to_gcs(uploader, parquet_type, table_name)
.await
}

pub fn default_for_type(parquet_type: &ParquetTypeEnum) -> Self {
match parquet_type {
ParquetTypeEnum::MoveResource => {
ParquetTypeStructs::new(Vec::<MoveResource>::default_for_type())
},
(
ParquetTypeStructs::WriteSetChange(buf),
ParquetTypeStructs::WriteSetChange(mut data),
) => {
buf.append(&mut data);
Ok(())
ParquetTypeEnum::WriteSetChange => {
ParquetTypeStructs::new(Vec::<WriteSetChangeModel>::default_for_type())
},
(ParquetTypeStructs::Transaction(buf), ParquetTypeStructs::Transaction(mut data)) => {
buf.append(&mut data);
Ok(())
ParquetTypeEnum::Transaction => {
ParquetTypeStructs::new(Vec::<ParquetTransaction>::default_for_type())
},
(ParquetTypeStructs::TableItem(buf), ParquetTypeStructs::TableItem(mut data)) => {
buf.append(&mut data);
Ok(())
ParquetTypeEnum::TableItem => {
ParquetTypeStructs::new(Vec::<TableItem>::default_for_type())
},
(ParquetTypeStructs::MoveModule(buf), ParquetTypeStructs::MoveModule(mut data)) => {
buf.append(&mut data);
Ok(())
ParquetTypeEnum::MoveModule => {
ParquetTypeStructs::new(Vec::<MoveModule>::default_for_type())
},
_ => Err(ProcessorError::ProcessError {
message: "Mismatched buffer types in append operation".to_string(),
}),
}
}
}

impl Clone for ParquetTypeStructs {
fn clone(&self) -> Self {
Self {
data: self.data.clone_box(),
}
}
}

/// Macro for implementing ParquetTypeTrait for multiple types.
macro_rules! impl_parquet_trait {
($type:ty, $enum_variant:expr) => {
#[async_trait]
impl ParquetTypeTrait for Vec<$type> {
fn parquet_type(&self) -> ParquetTypeEnum {
$enum_variant
}

fn calculate_size(&self) -> usize {
allocative::size_of_unique(self)
}

fn append(&mut self, other: Box<dyn ParquetTypeTrait>) -> Result<(), ProcessorError> {
if let Some(other_data) = other.downcast_ref::<Vec<$type>>() {
self.extend(other_data.clone());
Ok(())
} else {
Err(ProcessorError::ProcessError {
message: "Mismatched buffer types in append operation".to_string(),
})
}
}

async fn upload_to_gcs(
&self,
uploader: &mut GCSUploader,
parquet_type: ParquetTypeEnum,
table_name: &str,
) -> anyhow::Result<()> {
uploader
.upload_generic(self, parquet_type, table_name)
.await
}

fn clone_box(&self) -> Box<dyn ParquetTypeTrait> {
Box::new(self.clone())
}
}
};
}

// Apply macro to supported types
impl_parquet_trait!(MoveResource, ParquetTypeEnum::MoveResource);
impl_parquet_trait!(WriteSetChangeModel, ParquetTypeEnum::WriteSetChange);
impl_parquet_trait!(ParquetTransaction, ParquetTypeEnum::Transaction);
impl_parquet_trait!(TableItem, ParquetTypeEnum::TableItem);
impl_parquet_trait!(MoveModule, ParquetTypeEnum::MoveModule);

async fn initialize_gcs_client(credentials: Option<String>) -> Arc<GCSClient> {
if let Some(credentials) = credentials {
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
Expand All @@ -147,6 +229,7 @@ async fn initialize_gcs_client(credentials: Option<String>) -> Arc<GCSClient> {
Arc::new(GCSClient::new(gcs_config))
}

/// Initializes the database connection pool.
async fn initialize_database_pool(config: &DbConfig) -> anyhow::Result<ArcDbPool> {
match config {
DbConfig::ParquetConfig(ref parquet_config) => {
Expand All @@ -168,6 +251,7 @@ async fn initialize_database_pool(config: &DbConfig) -> anyhow::Result<ArcDbPool
}
}

/// Initializes the Parquet buffer step.
async fn initialize_parquet_buffer_step(
gcs_client: Arc<GCSClient>,
parquet_type_to_schemas: HashMap<ParquetTypeEnum, Arc<Type>>,
Expand Down Expand Up @@ -203,6 +287,7 @@ async fn initialize_parquet_buffer_step(
Ok(default_size_buffer_step)
}

/// Sets the backfill table flag.
fn set_backfill_table_flag(table_names: HashSet<String>) -> TableFlags {
let mut backfill_table = TableFlags::empty();

Expand All @@ -216,17 +301,37 @@ fn set_backfill_table_flag(table_names: HashSet<String>) -> TableFlags {
}

#[cfg(test)]
mod test {
mod tests {
use super::*;
use strum::VariantNames;

/// This test exists to make sure that when a new processor is added, it is added
/// to both Processor and ProcessorConfig.
///
/// To make sure this passes, make sure the variants are in the same order
/// (lexicographical) and the names match.
#[test]
fn test_parquet_type_names_complete() {
assert_eq!(ParquetTypeStructName::VARIANTS, ParquetTypeName::VARIANTS);
fn test_parquet_type_enum_matches_trait() {
let types = vec![
ParquetTypeEnum::MoveResource,
ParquetTypeEnum::WriteSetChange,
ParquetTypeEnum::Transaction,
ParquetTypeEnum::TableItem,
ParquetTypeEnum::MoveModule,
];
for t in types {
// Use a type corresponding to the ParquetTypeEnum
let default = match t {
ParquetTypeEnum::MoveResource => {
ParquetTypeStructs::new(Vec::<MoveResource>::default())
},
ParquetTypeEnum::WriteSetChange => {
ParquetTypeStructs::new(Vec::<WriteSetChangeModel>::default())
},
ParquetTypeEnum::Transaction => {
ParquetTypeStructs::new(Vec::<ParquetTransaction>::default())
},
ParquetTypeEnum::TableItem => ParquetTypeStructs::new(Vec::<TableItem>::default()),
ParquetTypeEnum::MoveModule => {
ParquetTypeStructs::new(Vec::<MoveModule>::default())
},
};

assert_eq!(default.data.parquet_type(), t);
}
}
}
Loading

0 comments on commit 5b9608c

Please sign in to comment.