From f82813541405e9f0926e23e6af11fee7e3d56d38 Mon Sep 17 00:00:00 2001 From: Yuun Lim <38443641+yuunlimm@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:24:46 -0800 Subject: [PATCH] [parquet-sdk-migration] add a logic to determine the starting version for parquet processor (#587) [parquet-sdk-migration] add a logic to determine the starting version for parquet processor add docstring [Cand update the name of functions fanout builder failing with send trait --- .../src/config/processor_config.rs | 163 ++++++++++++++++-- .../src/parquet_processors/mod.rs | 86 +++++++++ .../parquet_default_processor.rs | 6 +- 3 files changed, 238 insertions(+), 17 deletions(-) diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 52ecc87c0..b490b73a4 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -1,10 +1,15 @@ -use crate::processors::{ - ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig, - stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig, +use crate::{ + parquet_processors::ParquetTypeEnum, + processors::{ + ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig, + stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig, + }, }; use ahash::AHashMap; use serde::{Deserialize, Serialize}; use std::collections::HashSet; +use strum::IntoEnumIterator; + /// This enum captures the configs for all the different processors that are defined. /// /// The configs for each processor should only contain configuration specific to that @@ -63,23 +68,45 @@ impl ProcessorConfig { /// /// This is a convenience method to map the table names to include the processor name as a prefix, which /// is useful for querying the status from the processor status table in the database. - pub fn get_table_names(&self) -> Option> { + pub fn get_table_names(&self) -> anyhow::Result> { match self { ProcessorConfig::ParquetDefaultProcessor(config) => { // Get the processor name as a prefix let prefix = self.name(); - // Use the tables from the config and map them to include the prefix - Some( - config - .tables - .iter() - .map(|table_name| format!("{}_{}", prefix, table_name)) - .collect(), - ) + + // Collect valid table names from `ParquetTypeEnum` into a set for quick lookup + let valid_table_names: HashSet = + ParquetTypeEnum::iter().map(|e| e.to_string()).collect(); + + // Validate and map table names with prefix + let mut validated_table_names = Vec::new(); + for table_name in &config.tables { + // Ensure the table name is a valid `ParquetTypeEnum` variant + if !valid_table_names.contains(table_name) { + return Err(anyhow::anyhow!( + "Invalid table name '{}'. Expected one of: {:?}", + table_name, + valid_table_names + )); + } + + // Append the prefix to the validated table name + validated_table_names.push(Self::format_table_name(prefix, table_name)); + } + + Ok(validated_table_names) }, - _ => None, // For all other processor types, return None + _ => Err(anyhow::anyhow!( + "Invalid parquet processor config: {:?}", + self + )), } } + + /// helper function to format the table name with the processor name. + fn format_table_name(prefix: &str, table_name: &str) -> String { + format!("{}.{}", prefix, table_name) + } } #[derive(Clone, Debug, Deserialize, Serialize)] @@ -152,3 +179,113 @@ impl ParquetDefaultProcessorConfig { 1800 // 30 minutes } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_valid_table_names() { + let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { + tables: HashSet::from(["MoveResource".to_string(), "Transaction".to_string()]), + bucket_name: "bucket_name".to_string(), + bucket_root: "bucket_root".to_string(), + google_application_credentials: None, + parquet_handler_response_channel_size: 10, + max_buffer_size: 100000, + parquet_upload_interval: 1800, + }); + + let result = config.get_table_names(); + assert!(result.is_ok()); + + let table_names = result.unwrap(); + + assert_eq!(table_names, vec![ + "parquet_default_processor.Transaction".to_string(), + "parquet_default_processor.MoveResource".to_string(), + ]); + } + + #[test] + fn test_invalid_table_name() { + let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { + tables: HashSet::from(["InvalidTable".to_string(), "Transaction".to_string()]), + bucket_name: "bucket_name".to_string(), + bucket_root: "bucket_root".to_string(), + google_application_credentials: None, + parquet_handler_response_channel_size: 10, + max_buffer_size: 100000, + parquet_upload_interval: 1800, + }); + + let result = config.get_table_names(); + assert!(result.is_err()); + + let error_message = result.unwrap_err().to_string(); + assert!(error_message.contains("Invalid table name 'InvalidTable'")); + assert!(error_message.contains("Expected one of:")); + } + + #[test] + fn test_empty_tables() { + let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { + tables: HashSet::new(), + bucket_name: "bucket_name".to_string(), + bucket_root: "bucket_root".to_string(), + google_application_credentials: None, + parquet_handler_response_channel_size: 10, + max_buffer_size: 100000, + parquet_upload_interval: 1800, + }); + let result = config.get_table_names(); + assert!(result.is_ok()); + + let table_names = result.unwrap(); + assert_eq!(table_names, Vec::::new()); + } + + #[test] + fn test_duplicate_table_names() { + let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { + tables: HashSet::from(["Transaction".to_string(), "Transaction".to_string()]), + bucket_name: "bucket_name".to_string(), + bucket_root: "bucket_root".to_string(), + google_application_credentials: None, + parquet_handler_response_channel_size: 10, + max_buffer_size: 100000, + parquet_upload_interval: 1800, + }); + + let result = config.get_table_names(); + assert!(result.is_ok()); + + let table_names = result.unwrap(); + assert_eq!(table_names, vec![ + "parquet_default_processor.Transaction".to_string(), + ]); + } + + #[test] + fn test_all_enum_table_names() { + let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { + tables: ParquetTypeEnum::iter().map(|e| e.to_string()).collect(), + bucket_name: "bucket_name".to_string(), + bucket_root: "bucket_root".to_string(), + google_application_credentials: None, + parquet_handler_response_channel_size: 10, + max_buffer_size: 100000, + parquet_upload_interval: 1800, + }); + + let result = config.get_table_names(); + assert!(result.is_ok()); + + let table_names = result.unwrap(); + let expected_names: HashSet = ParquetTypeEnum::iter() + .map(|e| format!("parquet_default_processor.{}", e)) + .collect(); + let table_names: HashSet = table_names.into_iter().collect(); + assert_eq!(table_names, expected_names); + } +} diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 20654ab57..9c89dda25 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -1 +1,87 @@ +use processor::db::common::models::default_models::{ + parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, + parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, + parquet_write_set_changes::WriteSetChangeModel, +}; +use serde::{Deserialize, Serialize}; +use strum::{Display, EnumIter}; + pub mod parquet_default_processor; + +/// Enum representing the different types of Parquet files that can be processed. +#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Display, EnumIter)] +#[cfg_attr( + test, + derive(strum::EnumDiscriminants), + strum_discriminants( + derive( + strum::EnumVariantNames, + Deserialize, + Serialize, + strum::IntoStaticStr, + strum::Display, + clap::ValueEnum + ), + name(ParquetTypeName), + strum(serialize_all = "snake_case") + ) +)] +pub enum ParquetTypeEnum { + MoveResource, + WriteSetChange, + Transaction, + TableItem, + MoveModule, +} + +#[derive(Clone, Debug, strum::EnumDiscriminants)] +#[strum(serialize_all = "snake_case")] +#[strum_discriminants( + derive( + Deserialize, + Serialize, + strum::EnumVariantNames, + strum::IntoStaticStr, + strum::Display, + clap::ValueEnum + ), + name(ParquetTypeStructName), + clap(rename_all = "snake_case"), + serde(rename_all = "snake_case"), + strum(serialize_all = "snake_case") +)] +pub enum ParquetTypeStructs { + MoveResource(Vec), + WriteSetChange(Vec), + Transaction(Vec), + TableItem(Vec), + MoveModule(Vec), +} + +impl ParquetTypeStructs { + pub fn default_for_type(parquet_type: &ParquetTypeEnum) -> Self { + match parquet_type { + ParquetTypeEnum::MoveResource => ParquetTypeStructs::MoveResource(Vec::new()), + ParquetTypeEnum::WriteSetChange => ParquetTypeStructs::WriteSetChange(Vec::new()), + ParquetTypeEnum::Transaction => ParquetTypeStructs::Transaction(Vec::new()), + ParquetTypeEnum::TableItem => ParquetTypeStructs::TableItem(Vec::new()), + ParquetTypeEnum::MoveModule => ParquetTypeStructs::MoveModule(Vec::new()), + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use strum::VariantNames; + + /// This test exists to make sure that when a new processor is added, it is added + /// to both Processor and ProcessorConfig. + /// + /// To make sure this passes, make sure the variants are in the same order + /// (lexicographical) and the names match. + #[test] + fn test_parquet_type_names_complete() { + assert_eq!(ParquetTypeStructName::VARIANTS, ParquetTypeName::VARIANTS); + } +} diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index 4799794c6..bb35ea276 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -74,10 +74,8 @@ impl ProcessorTrait for ParquetDefaultProcessor { .config .processor_config .get_table_names() - .context(format!( - "Failed to get table names for the processor {}", - self.config.processor_config.name() - ))?; + .context("Failed to get table names for the processor")?; + get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names) .await?; };