Skip to content

Commit

Permalink
[parquet-sdk-migration] add a logic to determine the starting version…
Browse files Browse the repository at this point in the history
… for parquet processor (#587)

[parquet-sdk-migration] add a logic to determine the starting version for parquet processor

add docstring [Cand update the name of functions

fanout builder failing with send trait
  • Loading branch information
yuunlimm authored Nov 14, 2024
1 parent 5d25987 commit f828135
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 17 deletions.
163 changes: 150 additions & 13 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use crate::processors::{
ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig,
stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig,
use crate::{
parquet_processors::ParquetTypeEnum,
processors::{
ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig,
stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig,
},
};
use ahash::AHashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use strum::IntoEnumIterator;

/// This enum captures the configs for all the different processors that are defined.
///
/// The configs for each processor should only contain configuration specific to that
Expand Down Expand Up @@ -63,23 +68,45 @@ impl ProcessorConfig {
///
/// This is a convenience method to map the table names to include the processor name as a prefix, which
/// is useful for querying the status from the processor status table in the database.
pub fn get_table_names(&self) -> Option<Vec<String>> {
pub fn get_table_names(&self) -> anyhow::Result<Vec<String>> {
match self {
ProcessorConfig::ParquetDefaultProcessor(config) => {
// Get the processor name as a prefix
let prefix = self.name();
// Use the tables from the config and map them to include the prefix
Some(
config
.tables
.iter()
.map(|table_name| format!("{}_{}", prefix, table_name))
.collect(),
)

// Collect valid table names from `ParquetTypeEnum` into a set for quick lookup
let valid_table_names: HashSet<String> =
ParquetTypeEnum::iter().map(|e| e.to_string()).collect();

// Validate and map table names with prefix
let mut validated_table_names = Vec::new();
for table_name in &config.tables {
// Ensure the table name is a valid `ParquetTypeEnum` variant
if !valid_table_names.contains(table_name) {
return Err(anyhow::anyhow!(
"Invalid table name '{}'. Expected one of: {:?}",
table_name,
valid_table_names
));
}

// Append the prefix to the validated table name
validated_table_names.push(Self::format_table_name(prefix, table_name));
}

Ok(validated_table_names)
},
_ => None, // For all other processor types, return None
_ => Err(anyhow::anyhow!(
"Invalid parquet processor config: {:?}",
self
)),
}
}

/// helper function to format the table name with the processor name.
fn format_table_name(prefix: &str, table_name: &str) -> String {
format!("{}.{}", prefix, table_name)
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -152,3 +179,113 @@ impl ParquetDefaultProcessorConfig {
1800 // 30 minutes
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_valid_table_names() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::from(["MoveResource".to_string(), "Transaction".to_string()]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
parquet_handler_response_channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});

let result = config.get_table_names();
assert!(result.is_ok());

let table_names = result.unwrap();

assert_eq!(table_names, vec![
"parquet_default_processor.Transaction".to_string(),
"parquet_default_processor.MoveResource".to_string(),
]);
}

#[test]
fn test_invalid_table_name() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::from(["InvalidTable".to_string(), "Transaction".to_string()]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
parquet_handler_response_channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});

let result = config.get_table_names();
assert!(result.is_err());

let error_message = result.unwrap_err().to_string();
assert!(error_message.contains("Invalid table name 'InvalidTable'"));
assert!(error_message.contains("Expected one of:"));
}

#[test]
fn test_empty_tables() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::new(),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
parquet_handler_response_channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});
let result = config.get_table_names();
assert!(result.is_ok());

let table_names = result.unwrap();
assert_eq!(table_names, Vec::<String>::new());
}

#[test]
fn test_duplicate_table_names() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::from(["Transaction".to_string(), "Transaction".to_string()]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
parquet_handler_response_channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});

let result = config.get_table_names();
assert!(result.is_ok());

let table_names = result.unwrap();
assert_eq!(table_names, vec![
"parquet_default_processor.Transaction".to_string(),
]);
}

#[test]
fn test_all_enum_table_names() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: ParquetTypeEnum::iter().map(|e| e.to_string()).collect(),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
parquet_handler_response_channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});

let result = config.get_table_names();
assert!(result.is_ok());

let table_names = result.unwrap();
let expected_names: HashSet<String> = ParquetTypeEnum::iter()
.map(|e| format!("parquet_default_processor.{}", e))
.collect();
let table_names: HashSet<String> = table_names.into_iter().collect();
assert_eq!(table_names, expected_names);
}
}
86 changes: 86 additions & 0 deletions rust/sdk-processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,87 @@
use processor::db::common::models::default_models::{
parquet_move_modules::MoveModule, parquet_move_resources::MoveResource,
parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction,
parquet_write_set_changes::WriteSetChangeModel,
};
use serde::{Deserialize, Serialize};
use strum::{Display, EnumIter};

pub mod parquet_default_processor;

/// Enum representing the different types of Parquet files that can be processed.
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Display, EnumIter)]
#[cfg_attr(
test,
derive(strum::EnumDiscriminants),
strum_discriminants(
derive(
strum::EnumVariantNames,
Deserialize,
Serialize,
strum::IntoStaticStr,
strum::Display,
clap::ValueEnum
),
name(ParquetTypeName),
strum(serialize_all = "snake_case")
)
)]
pub enum ParquetTypeEnum {
MoveResource,
WriteSetChange,
Transaction,
TableItem,
MoveModule,
}

#[derive(Clone, Debug, strum::EnumDiscriminants)]
#[strum(serialize_all = "snake_case")]
#[strum_discriminants(
derive(
Deserialize,
Serialize,
strum::EnumVariantNames,
strum::IntoStaticStr,
strum::Display,
clap::ValueEnum
),
name(ParquetTypeStructName),
clap(rename_all = "snake_case"),
serde(rename_all = "snake_case"),
strum(serialize_all = "snake_case")
)]
pub enum ParquetTypeStructs {
MoveResource(Vec<MoveResource>),
WriteSetChange(Vec<WriteSetChangeModel>),
Transaction(Vec<ParquetTransaction>),
TableItem(Vec<TableItem>),
MoveModule(Vec<MoveModule>),
}

impl ParquetTypeStructs {
pub fn default_for_type(parquet_type: &ParquetTypeEnum) -> Self {
match parquet_type {
ParquetTypeEnum::MoveResource => ParquetTypeStructs::MoveResource(Vec::new()),
ParquetTypeEnum::WriteSetChange => ParquetTypeStructs::WriteSetChange(Vec::new()),
ParquetTypeEnum::Transaction => ParquetTypeStructs::Transaction(Vec::new()),
ParquetTypeEnum::TableItem => ParquetTypeStructs::TableItem(Vec::new()),
ParquetTypeEnum::MoveModule => ParquetTypeStructs::MoveModule(Vec::new()),
}
}
}

#[cfg(test)]
mod test {
use super::*;
use strum::VariantNames;

/// This test exists to make sure that when a new processor is added, it is added
/// to both Processor and ProcessorConfig.
///
/// To make sure this passes, make sure the variants are in the same order
/// (lexicographical) and the names match.
#[test]
fn test_parquet_type_names_complete() {
assert_eq!(ParquetTypeStructName::VARIANTS, ParquetTypeName::VARIANTS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,8 @@ impl ProcessorTrait for ParquetDefaultProcessor {
.config
.processor_config
.get_table_names()
.context(format!(
"Failed to get table names for the processor {}",
self.config.processor_config.name()
))?;
.context("Failed to get table names for the processor")?;

get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names)
.await?;
};
Expand Down

0 comments on commit f828135

Please sign in to comment.