Skip to content

Commit

Permalink
[SDK-parquet] add parquet version tracker (#609)
Browse files Browse the repository at this point in the history
### Description
**1. Added Parquet Version Tracker Functionality** 
 - Updated steps to include set_backfill_table_flag logic for selective table processing.
 - added a processor status saver for parquet

**2. Schema and Table Handling Updates**
- Updated the logic for handling backfill tables:
- Renamed the tables field with backfill_table in - ParquetDefaultProcessorConfig.
- Adjusted validations and logic to ensure only valid tables are processed. 
  ParquetTypeEnum improvements:
  - Added mappings and validations for table names.
  - Enhanced schema initialization and writer creation.
  
**3. Tests updated**
- Modified tests to reflect changes in backfill_table handling and validation.
- Updated table name checks to ensure compatibility with the new logic.
- Added test coverage for: Invalid backfill tables.

**4. General Code Improvements**
-  Removed redundant logic in ParquetDefaultProcessor.
- Moved shared functionality (e.g., writer creation) to reusable helper functions.
- initialize_database_pool centralizes database pool setup for Postgres.
- Handles error cases cleanly.
- initialize_gcs_client abstracts GCS client setup using provided credentials.
- Consolidated initialization of schemas, writers, and GCS uploaders into modular functions.

Enhanced comments for better readability and maintainability.


Test Plan
![Screenshot 2024-11-15 at 12 35 53 PM](https://github.com/user-attachments/assets/34c8314b-bec2-4183-be51-23556911c9f4)


![Screenshot 2024-11-15 at 11.38.50 AM.png](https://graphite-user-uploaded-assets-prod.s3.amazonaws.com/XVbPtMdqqe4K1PNnLQvf/6ae31ace-50b1-42ce-b88b-03941a310e4a.png)

backfill DQ check on number of rows for Move_resources
![Screenshot 2024-11-18 at 11 12 27 AM](https://github.com/user-attachments/assets/1469d5d5-79e5-4387-aafa-9803b86400a5)
![Screenshot 2024-11-18 at 11 12 24 AM](https://github.com/user-attachments/assets/96377014-0350-4321-afcb-3e1baa7c3a6e)
  • Loading branch information
yuunlimm authored Nov 20, 2024
1 parent 1af386d commit e3795ed
Show file tree
Hide file tree
Showing 24 changed files with 682 additions and 330 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ where
pub last_upload_time: Instant,
pub processor_name: String,
}

fn create_new_writer(schema: Arc<Type>) -> Result<SerializedFileWriter<Vec<u8>>> {
let props = WriterProperties::builder()
.set_compression(parquet::basic::Compression::LZ4)
Expand Down
2 changes: 1 addition & 1 deletion rust/processor/src/processors/default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl ProcessorTrait for DefaultProcessor {
}
}

// TODO: we can further optimize this by passing in a falg to selectively parse only the required data (e.g. table_items for parquet)
// TODO: we can further optimize this by passing in a flag to selectively parse only the required data (e.g. table_items for parquet)
/// Processes a list of transactions and extracts relevant data into different models.
///
/// This function iterates over a list of transactions, extracting block metadata transactions,
Expand Down
17 changes: 17 additions & 0 deletions rust/sdk-processor/src/config/db_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use serde::{Deserialize, Serialize};
)]
pub enum DbConfig {
PostgresConfig(PostgresConfig),
ParquetConfig(ParquetConfig),
}

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand All @@ -48,3 +49,19 @@ impl PostgresConfig {
150
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct ParquetConfig {
pub connection_string: String,
// Size of the pool for writes/reads to the DB. Limits maximum number of queries in flight
#[serde(default = "PostgresConfig::default_db_pool_size")]
pub db_pool_size: u32,
// Optional Google application credentials for authentication
#[serde(default)]
pub google_application_credentials: Option<String>,
#[serde(default)]
pub bucket_name: String,
#[serde(default)]
pub bucket_root: String,
}
180 changes: 90 additions & 90 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
use crate::{
parquet_processors::ParquetTypeEnum,
processors::{
ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig,
stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig,
},
utils::parquet_processor_table_mapping::VALID_TABLE_NAMES,
};
use ahash::AHashMap;
use processor::{
bq_analytics::generic_parquet_processor::NamedTable,
db::parquet::models::default_models::{
parquet_move_modules::MoveModule, parquet_move_resources::MoveResource,
parquet_move_tables::TableItem, parquet_transactions::Transaction,
parquet_write_set_changes::WriteSetChangeModel,
},
};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use strum::IntoEnumIterator;

/// This enum captures the configs for all the different processors that are defined.
///
Expand All @@ -35,7 +42,8 @@ use strum::IntoEnumIterator;
strum::EnumVariantNames,
strum::IntoStaticStr,
strum::Display,
clap::ValueEnum
clap::ValueEnum,
strum::EnumIter
),
name(ProcessorName),
clap(rename_all = "snake_case"),
Expand Down Expand Up @@ -68,33 +76,26 @@ impl ProcessorConfig {
///
/// This is a convenience method to map the table names to include the processor name as a prefix, which
/// is useful for querying the status from the processor status table in the database.
pub fn get_table_names(&self) -> anyhow::Result<Vec<String>> {
pub fn get_processor_status_table_names(&self) -> anyhow::Result<Vec<String>> {
match self {
ProcessorConfig::ParquetDefaultProcessor(config) => {
// Get the processor name as a prefix
let prefix = self.name();
let processor_name = self.name();

// Collect valid table names from `ParquetTypeEnum` into a set for quick lookup
let valid_table_names: HashSet<String> =
ParquetTypeEnum::iter().map(|e| e.to_string()).collect();
let valid_table_names = VALID_TABLE_NAMES
.get(processor_name)
.ok_or_else(|| anyhow::anyhow!("Processor type not recognized"))?;

// Validate and map table names with prefix
let mut validated_table_names = Vec::new();
for table_name in &config.tables {
// Ensure the table name is a valid `ParquetTypeEnum` variant
if !valid_table_names.contains(table_name) {
return Err(anyhow::anyhow!(
"Invalid table name '{}'. Expected one of: {:?}",
table_name,
valid_table_names
));
}

// Append the prefix to the validated table name
validated_table_names.push(Self::format_table_name(prefix, table_name));
// Use the helper function for validation and mapping
if config.backfill_table.is_empty() {
Ok(valid_table_names
.iter()
.cloned()
.map(|table_name| Self::format_table_name(processor_name, &table_name))
.collect())
} else {
Self::validate_backfill_table_names(&config.backfill_table, valid_table_names)
}

Ok(validated_table_names)
},
_ => Err(anyhow::anyhow!(
"Invalid parquet processor config: {:?}",
Expand All @@ -103,10 +104,44 @@ impl ProcessorConfig {
}
}

/// Get the set of table names to process for the given processor.
pub fn table_names(processor: &ProcessorName) -> HashSet<String> {
match processor {
ProcessorName::ParquetDefaultProcessor => HashSet::from([
Transaction::TABLE_NAME.to_string(),
MoveResource::TABLE_NAME.to_string(),
WriteSetChangeModel::TABLE_NAME.to_string(),
TableItem::TABLE_NAME.to_string(),
MoveModule::TABLE_NAME.to_string(),
]),
_ => HashSet::new(), // Default case for unsupported processors
}
}

/// helper function to format the table name with the processor name.
fn format_table_name(prefix: &str, table_name: &str) -> String {
format!("{}.{}", prefix, table_name)
}

/// This is to validate table_name for the backfill table
fn validate_backfill_table_names(
table_names: &HashSet<String>,
valid_table_names: &HashSet<String>,
) -> anyhow::Result<Vec<String>> {
table_names
.iter()
.map(|table_name| {
if !valid_table_names.contains(&table_name.to_lowercase()) {
return Err(anyhow::anyhow!(
"Invalid table name '{}'. Expected one of: {:?}",
table_name,
valid_table_names
));
}
Ok(table_name.clone())
})
.collect()
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -142,22 +177,15 @@ impl Default for DefaultProcessorConfig {
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct ParquetDefaultProcessorConfig {
// Optional Google application credentials for authentication
#[serde(default)]
pub google_application_credentials: Option<String>,
#[serde(default)]
pub bucket_name: String,
#[serde(default)]
pub bucket_root: String,
#[serde(default = "ParquetDefaultProcessorConfig::default_channel_size")]
pub channel_size: usize,
#[serde(default = "ParquetDefaultProcessorConfig::default_max_buffer_size")]
pub max_buffer_size: usize,
#[serde(default = "ParquetDefaultProcessorConfig::default_parquet_upload_interval")]
pub parquet_upload_interval: u64,
// list of table names to backfill. Using HashSet for fast lookups, and for future extensibility.
pub upload_interval: u64,
// Set of table name to backfill. Using HashSet for fast lookups, and for future extensibility.
#[serde(default)]
pub tables: HashSet<String>,
pub backfill_table: HashSet<String>,
}

impl ParquetDefaultProcessorConfig {
Expand Down Expand Up @@ -185,41 +213,32 @@ mod tests {
#[test]
fn test_valid_table_names() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::from(["MoveResource".to_string(), "Transaction".to_string()]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
backfill_table: HashSet::from(["move_resources".to_string()]),
channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
upload_interval: 1800,
});

let result = config.get_table_names();
let result = config.get_processor_status_table_names();
assert!(result.is_ok());

let table_names = result.unwrap();
let table_names: HashSet<String> = table_names.into_iter().collect();
let expected_names: HashSet<String> =
["Transaction".to_string(), "MoveResource".to_string()]
.iter()
.map(|e| format!("parquet_default_processor.{}", e))
.collect();
["move_resources".to_string()].iter().cloned().collect();
assert_eq!(table_names, expected_names);
}

#[test]
fn test_invalid_table_name() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::from(["InvalidTable".to_string(), "Transaction".to_string()]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
backfill_table: HashSet::from(["InvalidTable".to_string(), "transactions".to_string()]),
channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
upload_interval: 1800,
});

let result = config.get_table_names();
let result = config.get_processor_status_table_names();
assert!(result.is_err());

let error_message = result.unwrap_err().to_string();
Expand All @@ -228,64 +247,45 @@ mod tests {
}

#[test]
fn test_empty_tables() {
fn test_empty_backfill_tables() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::new(),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
backfill_table: HashSet::new(),
channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
upload_interval: 1800,
});
let result = config.get_table_names();
let result = config.get_processor_status_table_names();
assert!(result.is_ok());

let table_names = result.unwrap();
assert_eq!(table_names, Vec::<String>::new());
}
let expected_names: HashSet<String> = [
"move_resources".to_string(),
"transactions".to_string(),
"write_set_changes".to_string(),
"table_items".to_string(),
"move_modules".to_string(),
]
.iter()
.map(|e| format!("parquet_default_processor.{}", e))
.collect();

#[test]
fn test_duplicate_table_names() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: HashSet::from(["Transaction".to_string(), "Transaction".to_string()]),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
});

let result = config.get_table_names();
assert!(result.is_ok());

let table_names = result.unwrap();
assert_eq!(table_names, vec![
"parquet_default_processor.Transaction".to_string(),
]);
let table_names: HashSet<String> = table_names.into_iter().collect();
assert_eq!(table_names, expected_names);
}

#[test]
fn test_all_enum_table_names() {
fn test_duplicate_table_names_in_backfill_names() {
let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig {
tables: ParquetTypeEnum::iter().map(|e| e.to_string()).collect(),
bucket_name: "bucket_name".to_string(),
bucket_root: "bucket_root".to_string(),
google_application_credentials: None,
backfill_table: HashSet::from(["transactions".to_string(), "transactions".to_string()]),
channel_size: 10,
max_buffer_size: 100000,
parquet_upload_interval: 1800,
upload_interval: 1800,
});

let result = config.get_table_names();
let result = config.get_processor_status_table_names();
assert!(result.is_ok());

let table_names = result.unwrap();
let expected_names: HashSet<String> = ParquetTypeEnum::iter()
.map(|e| format!("parquet_default_processor.{}", e))
.collect();
let table_names: HashSet<String> = table_names.into_iter().collect();
assert_eq!(table_names, expected_names);
assert_eq!(table_names, vec!["transactions".to_string(),]);
}
}
Loading

0 comments on commit e3795ed

Please sign in to comment.