From e3795ed7fc4b46730a1207450a0b10186c9078bd Mon Sep 17 00:00:00 2001 From: Yuun Lim <38443641+yuunlimm@users.noreply.github.com> Date: Wed, 20 Nov 2024 15:01:56 -0800 Subject: [PATCH] [SDK-parquet] add parquet version tracker (#609) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Description **1. Added Parquet Version Tracker Functionality** - Updated steps to include set_backfill_table_flag logic for selective table processing. - added a processor status saver for parquet **2. Schema and Table Handling Updates** - Updated the logic for handling backfill tables: - Renamed the tables field with backfill_table in - ParquetDefaultProcessorConfig. - Adjusted validations and logic to ensure only valid tables are processed. ParquetTypeEnum improvements: - Added mappings and validations for table names. - Enhanced schema initialization and writer creation. **3. Tests updated** - Modified tests to reflect changes in backfill_table handling and validation. - Updated table name checks to ensure compatibility with the new logic. - Added test coverage for: Invalid backfill tables. **4. General Code Improvements** - Removed redundant logic in ParquetDefaultProcessor. - Moved shared functionality (e.g., writer creation) to reusable helper functions. - initialize_database_pool centralizes database pool setup for Postgres. - Handles error cases cleanly. - initialize_gcs_client abstracts GCS client setup using provided credentials. - Consolidated initialization of schemas, writers, and GCS uploaders into modular functions. Enhanced comments for better readability and maintainability. Test Plan ![Screenshot 2024-11-15 at 12 35 53 PM](https://github.com/user-attachments/assets/34c8314b-bec2-4183-be51-23556911c9f4) ![Screenshot 2024-11-15 at 11.38.50 AM.png](https://graphite-user-uploaded-assets-prod.s3.amazonaws.com/XVbPtMdqqe4K1PNnLQvf/6ae31ace-50b1-42ce-b88b-03941a310e4a.png) backfill DQ check on number of rows for Move_resources ![Screenshot 2024-11-18 at 11 12 27 AM](https://github.com/user-attachments/assets/1469d5d5-79e5-4387-aafa-9803b86400a5) ![Screenshot 2024-11-18 at 11 12 24 AM](https://github.com/user-attachments/assets/96377014-0350-4321-afcb-3e1baa7c3a6e) --- .../bq_analytics/generic_parquet_processor.rs | 1 + .../src/processors/default_processor.rs | 2 +- rust/sdk-processor/src/config/db_config.rs | 17 ++ .../src/config/processor_config.rs | 180 +++++++++--------- .../src/parquet_processors/mod.rs | 120 ++++++++++-- .../parquet_default_processor.rs | 147 ++++++-------- .../account_transactions_processor.rs | 18 +- .../src/processors/ans_processor.rs | 18 +- .../src/processors/default_processor.rs | 18 +- .../src/processors/events_processor.rs | 18 +- .../processors/fungible_asset_processor.rs | 18 +- .../src/processors/monitoring_processor.rs | 18 +- .../src/processors/objects_processor.rs | 18 +- .../src/processors/stake_processor.rs | 18 +- .../src/processors/token_v2_processor.rs | 18 +- .../processors/user_transaction_processor.rs | 18 +- .../src/steps/common/gcs_uploader.rs | 51 +++-- rust/sdk-processor/src/steps/common/mod.rs | 1 + .../src/steps/common/parquet_buffer_step.rs | 44 ++--- .../common/parquet_version_tracker_step.rs | 161 ++++++++++++++++ .../steps/common/processor_status_saver.rs | 68 ++++++- rust/sdk-processor/src/utils/mod.rs | 1 + .../utils/parquet_processor_table_mapping.rs | 17 ++ .../src/utils/starting_version.rs | 22 ++- 24 files changed, 682 insertions(+), 330 deletions(-) create mode 100644 rust/sdk-processor/src/steps/common/parquet_version_tracker_step.rs create mode 100644 rust/sdk-processor/src/utils/parquet_processor_table_mapping.rs diff --git a/rust/processor/src/bq_analytics/generic_parquet_processor.rs b/rust/processor/src/bq_analytics/generic_parquet_processor.rs index b213fdeba..bfa9cddaf 100644 --- a/rust/processor/src/bq_analytics/generic_parquet_processor.rs +++ b/rust/processor/src/bq_analytics/generic_parquet_processor.rs @@ -72,6 +72,7 @@ where pub last_upload_time: Instant, pub processor_name: String, } + fn create_new_writer(schema: Arc) -> Result>> { let props = WriterProperties::builder() .set_compression(parquet::basic::Compression::LZ4) diff --git a/rust/processor/src/processors/default_processor.rs b/rust/processor/src/processors/default_processor.rs index 6e37814c1..a7e0a81dd 100644 --- a/rust/processor/src/processors/default_processor.rs +++ b/rust/processor/src/processors/default_processor.rs @@ -289,7 +289,7 @@ impl ProcessorTrait for DefaultProcessor { } } -// TODO: we can further optimize this by passing in a falg to selectively parse only the required data (e.g. table_items for parquet) +// TODO: we can further optimize this by passing in a flag to selectively parse only the required data (e.g. table_items for parquet) /// Processes a list of transactions and extracts relevant data into different models. /// /// This function iterates over a list of transactions, extracting block metadata transactions, diff --git a/rust/sdk-processor/src/config/db_config.rs b/rust/sdk-processor/src/config/db_config.rs index 05c0b2632..68f1da8c4 100644 --- a/rust/sdk-processor/src/config/db_config.rs +++ b/rust/sdk-processor/src/config/db_config.rs @@ -32,6 +32,7 @@ use serde::{Deserialize, Serialize}; )] pub enum DbConfig { PostgresConfig(PostgresConfig), + ParquetConfig(ParquetConfig), } #[derive(Clone, Debug, Deserialize, Serialize)] @@ -48,3 +49,19 @@ impl PostgresConfig { 150 } } + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct ParquetConfig { + pub connection_string: String, + // Size of the pool for writes/reads to the DB. Limits maximum number of queries in flight + #[serde(default = "PostgresConfig::default_db_pool_size")] + pub db_pool_size: u32, + // Optional Google application credentials for authentication + #[serde(default)] + pub google_application_credentials: Option, + #[serde(default)] + pub bucket_name: String, + #[serde(default)] + pub bucket_root: String, +} diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 57d1af2c8..4dbb6d64a 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -1,14 +1,21 @@ use crate::{ - parquet_processors::ParquetTypeEnum, processors::{ ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig, stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig, }, + utils::parquet_processor_table_mapping::VALID_TABLE_NAMES, }; use ahash::AHashMap; +use processor::{ + bq_analytics::generic_parquet_processor::NamedTable, + db::parquet::models::default_models::{ + parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, + parquet_move_tables::TableItem, parquet_transactions::Transaction, + parquet_write_set_changes::WriteSetChangeModel, + }, +}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; -use strum::IntoEnumIterator; /// This enum captures the configs for all the different processors that are defined. /// @@ -35,7 +42,8 @@ use strum::IntoEnumIterator; strum::EnumVariantNames, strum::IntoStaticStr, strum::Display, - clap::ValueEnum + clap::ValueEnum, + strum::EnumIter ), name(ProcessorName), clap(rename_all = "snake_case"), @@ -68,33 +76,26 @@ impl ProcessorConfig { /// /// This is a convenience method to map the table names to include the processor name as a prefix, which /// is useful for querying the status from the processor status table in the database. - pub fn get_table_names(&self) -> anyhow::Result> { + pub fn get_processor_status_table_names(&self) -> anyhow::Result> { match self { ProcessorConfig::ParquetDefaultProcessor(config) => { // Get the processor name as a prefix - let prefix = self.name(); + let processor_name = self.name(); - // Collect valid table names from `ParquetTypeEnum` into a set for quick lookup - let valid_table_names: HashSet = - ParquetTypeEnum::iter().map(|e| e.to_string()).collect(); + let valid_table_names = VALID_TABLE_NAMES + .get(processor_name) + .ok_or_else(|| anyhow::anyhow!("Processor type not recognized"))?; - // Validate and map table names with prefix - let mut validated_table_names = Vec::new(); - for table_name in &config.tables { - // Ensure the table name is a valid `ParquetTypeEnum` variant - if !valid_table_names.contains(table_name) { - return Err(anyhow::anyhow!( - "Invalid table name '{}'. Expected one of: {:?}", - table_name, - valid_table_names - )); - } - - // Append the prefix to the validated table name - validated_table_names.push(Self::format_table_name(prefix, table_name)); + // Use the helper function for validation and mapping + if config.backfill_table.is_empty() { + Ok(valid_table_names + .iter() + .cloned() + .map(|table_name| Self::format_table_name(processor_name, &table_name)) + .collect()) + } else { + Self::validate_backfill_table_names(&config.backfill_table, valid_table_names) } - - Ok(validated_table_names) }, _ => Err(anyhow::anyhow!( "Invalid parquet processor config: {:?}", @@ -103,10 +104,44 @@ impl ProcessorConfig { } } + /// Get the set of table names to process for the given processor. + pub fn table_names(processor: &ProcessorName) -> HashSet { + match processor { + ProcessorName::ParquetDefaultProcessor => HashSet::from([ + Transaction::TABLE_NAME.to_string(), + MoveResource::TABLE_NAME.to_string(), + WriteSetChangeModel::TABLE_NAME.to_string(), + TableItem::TABLE_NAME.to_string(), + MoveModule::TABLE_NAME.to_string(), + ]), + _ => HashSet::new(), // Default case for unsupported processors + } + } + /// helper function to format the table name with the processor name. fn format_table_name(prefix: &str, table_name: &str) -> String { format!("{}.{}", prefix, table_name) } + + /// This is to validate table_name for the backfill table + fn validate_backfill_table_names( + table_names: &HashSet, + valid_table_names: &HashSet, + ) -> anyhow::Result> { + table_names + .iter() + .map(|table_name| { + if !valid_table_names.contains(&table_name.to_lowercase()) { + return Err(anyhow::anyhow!( + "Invalid table name '{}'. Expected one of: {:?}", + table_name, + valid_table_names + )); + } + Ok(table_name.clone()) + }) + .collect() + } } #[derive(Clone, Debug, Deserialize, Serialize)] @@ -142,22 +177,15 @@ impl Default for DefaultProcessorConfig { #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] pub struct ParquetDefaultProcessorConfig { - // Optional Google application credentials for authentication - #[serde(default)] - pub google_application_credentials: Option, - #[serde(default)] - pub bucket_name: String, - #[serde(default)] - pub bucket_root: String, #[serde(default = "ParquetDefaultProcessorConfig::default_channel_size")] pub channel_size: usize, #[serde(default = "ParquetDefaultProcessorConfig::default_max_buffer_size")] pub max_buffer_size: usize, #[serde(default = "ParquetDefaultProcessorConfig::default_parquet_upload_interval")] - pub parquet_upload_interval: u64, - // list of table names to backfill. Using HashSet for fast lookups, and for future extensibility. + pub upload_interval: u64, + // Set of table name to backfill. Using HashSet for fast lookups, and for future extensibility. #[serde(default)] - pub tables: HashSet, + pub backfill_table: HashSet, } impl ParquetDefaultProcessorConfig { @@ -185,41 +213,32 @@ mod tests { #[test] fn test_valid_table_names() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: HashSet::from(["MoveResource".to_string(), "Transaction".to_string()]), - bucket_name: "bucket_name".to_string(), - bucket_root: "bucket_root".to_string(), - google_application_credentials: None, + backfill_table: HashSet::from(["move_resources".to_string()]), channel_size: 10, max_buffer_size: 100000, - parquet_upload_interval: 1800, + upload_interval: 1800, }); - let result = config.get_table_names(); + let result = config.get_processor_status_table_names(); assert!(result.is_ok()); let table_names = result.unwrap(); let table_names: HashSet = table_names.into_iter().collect(); let expected_names: HashSet = - ["Transaction".to_string(), "MoveResource".to_string()] - .iter() - .map(|e| format!("parquet_default_processor.{}", e)) - .collect(); + ["move_resources".to_string()].iter().cloned().collect(); assert_eq!(table_names, expected_names); } #[test] fn test_invalid_table_name() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: HashSet::from(["InvalidTable".to_string(), "Transaction".to_string()]), - bucket_name: "bucket_name".to_string(), - bucket_root: "bucket_root".to_string(), - google_application_credentials: None, + backfill_table: HashSet::from(["InvalidTable".to_string(), "transactions".to_string()]), channel_size: 10, max_buffer_size: 100000, - parquet_upload_interval: 1800, + upload_interval: 1800, }); - let result = config.get_table_names(); + let result = config.get_processor_status_table_names(); assert!(result.is_err()); let error_message = result.unwrap_err().to_string(); @@ -228,64 +247,45 @@ mod tests { } #[test] - fn test_empty_tables() { + fn test_empty_backfill_tables() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: HashSet::new(), - bucket_name: "bucket_name".to_string(), - bucket_root: "bucket_root".to_string(), - google_application_credentials: None, + backfill_table: HashSet::new(), channel_size: 10, max_buffer_size: 100000, - parquet_upload_interval: 1800, + upload_interval: 1800, }); - let result = config.get_table_names(); + let result = config.get_processor_status_table_names(); assert!(result.is_ok()); let table_names = result.unwrap(); - assert_eq!(table_names, Vec::::new()); - } + let expected_names: HashSet = [ + "move_resources".to_string(), + "transactions".to_string(), + "write_set_changes".to_string(), + "table_items".to_string(), + "move_modules".to_string(), + ] + .iter() + .map(|e| format!("parquet_default_processor.{}", e)) + .collect(); - #[test] - fn test_duplicate_table_names() { - let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: HashSet::from(["Transaction".to_string(), "Transaction".to_string()]), - bucket_name: "bucket_name".to_string(), - bucket_root: "bucket_root".to_string(), - google_application_credentials: None, - channel_size: 10, - max_buffer_size: 100000, - parquet_upload_interval: 1800, - }); - - let result = config.get_table_names(); - assert!(result.is_ok()); - - let table_names = result.unwrap(); - assert_eq!(table_names, vec![ - "parquet_default_processor.Transaction".to_string(), - ]); + let table_names: HashSet = table_names.into_iter().collect(); + assert_eq!(table_names, expected_names); } #[test] - fn test_all_enum_table_names() { + fn test_duplicate_table_names_in_backfill_names() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: ParquetTypeEnum::iter().map(|e| e.to_string()).collect(), - bucket_name: "bucket_name".to_string(), - bucket_root: "bucket_root".to_string(), - google_application_credentials: None, + backfill_table: HashSet::from(["transactions".to_string(), "transactions".to_string()]), channel_size: 10, max_buffer_size: 100000, - parquet_upload_interval: 1800, + upload_interval: 1800, }); - let result = config.get_table_names(); + let result = config.get_processor_status_table_names(); assert!(result.is_ok()); let table_names = result.unwrap(); - let expected_names: HashSet = ParquetTypeEnum::iter() - .map(|e| format!("parquet_default_processor.{}", e)) - .collect(); - let table_names: HashSet = table_names.into_iter().collect(); - assert_eq!(table_names, expected_names); + assert_eq!(table_names, vec!["transactions".to_string(),]); } } diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 218c9d8ac..a37e4efe3 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -1,16 +1,37 @@ +use crate::{ + config::db_config::DbConfig, + steps::common::{ + gcs_uploader::{create_new_writer, GCSUploader}, + parquet_buffer_step::ParquetBufferStep, + }, + utils::database::{new_db_pool, ArcDbPool}, +}; use aptos_indexer_processor_sdk::utils::errors::ProcessorError; -use processor::db::parquet::models::default_models::{ - parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, - parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, - parquet_write_set_changes::WriteSetChangeModel, +use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; +use parquet::schema::types::Type; +use processor::{ + db::parquet::models::default_models::{ + parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, + parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, + parquet_write_set_changes::WriteSetChangeModel, + }, + worker::TableFlags, }; use serde::{Deserialize, Serialize}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; use strum::{Display, EnumIter}; pub mod parquet_default_processor; +const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; + /// Enum representing the different types of Parquet files that can be processed. #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Display, EnumIter)] +#[strum(serialize_all = "snake_case")] #[cfg_attr( test, derive(strum::EnumDiscriminants), @@ -70,16 +91,6 @@ impl ParquetTypeStructs { } } - pub fn get_table_name(&self) -> &'static str { - match self { - ParquetTypeStructs::MoveResource(_) => "move_resources", - ParquetTypeStructs::WriteSetChange(_) => "write_set_changes", - ParquetTypeStructs::Transaction(_) => "transactions", - ParquetTypeStructs::TableItem(_) => "table_items", - ParquetTypeStructs::MoveModule(_) => "move_modules", - } - } - pub fn calculate_size(&self) -> usize { match self { ParquetTypeStructs::MoveResource(data) => allocative::size_of_unique(data), @@ -123,6 +134,87 @@ impl ParquetTypeStructs { } } +async fn initialize_gcs_client(credentials: Option) -> Arc { + if let Some(credentials) = credentials { + std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); + } + + let gcs_config = GcsClientConfig::default() + .with_auth() + .await + .expect("Failed to create GCS client config"); + + Arc::new(GCSClient::new(gcs_config)) +} + +async fn initialize_database_pool(config: &DbConfig) -> anyhow::Result { + match config { + DbConfig::ParquetConfig(ref parquet_config) => { + let conn_pool = new_db_pool( + &parquet_config.connection_string, + Some(parquet_config.db_pool_size), + ) + .await + .map_err(|e| { + anyhow::anyhow!( + "Failed to create connection pool for ParquetConfig: {:?}", + e + ) + })?; + + Ok(conn_pool) + }, + _ => Err(anyhow::anyhow!("Invalid db config for Parquet Processor")), + } +} + +async fn initialize_parquet_buffer_step( + gcs_client: Arc, + parquet_type_to_schemas: HashMap>, + upload_interval: u64, + max_buffer_size: usize, + bucket_name: String, + bucket_root: String, + processor_name: String, +) -> anyhow::Result { + let parquet_type_to_writer = parquet_type_to_schemas + .iter() + .map(|(key, schema)| { + let writer = create_new_writer(schema.clone()).expect("Failed to create writer"); + (*key, writer) + }) + .collect(); + + let buffer_uploader = GCSUploader::new( + gcs_client, + parquet_type_to_schemas, + parquet_type_to_writer, + bucket_name, + bucket_root, + processor_name, + )?; + + let default_size_buffer_step = ParquetBufferStep::new( + Duration::from_secs(upload_interval), + buffer_uploader, + max_buffer_size, + ); + + Ok(default_size_buffer_step) +} + +fn set_backfill_table_flag(table_names: HashSet) -> TableFlags { + let mut backfill_table = TableFlags::empty(); + + for table_name in table_names.iter() { + if let Some(flag) = TableFlags::from_name(table_name) { + println!("Setting backfill table flag: {:?}", flag); + backfill_table |= flag; + } + } + backfill_table +} + #[cfg(test)] mod test { use super::*; diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index c6c0e5e62..31aefd816 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -3,28 +3,30 @@ use crate::{ db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, processor_config::ProcessorConfig, }, - parquet_processors::ParquetTypeEnum, + parquet_processors::{ + initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step, + set_backfill_table_flag, ParquetTypeEnum, + }, steps::{ common::{ - gcs_uploader::{create_new_writer, GCSUploader}, - parquet_buffer_step::ParquetBufferStep, + parquet_version_tracker_step::ParquetVersionTrackerStep, + processor_status_saver::get_processor_status_saver, }, parquet_default_processor::parquet_default_extractor::ParquetDefaultExtractor, }, utils::{ chain_id::check_or_update_chain_id, - database::{new_db_pool, run_migrations, ArcDbPool}, - starting_version::{get_min_last_success_version_parquet, get_starting_version}, + database::{run_migrations, ArcDbPool}, + starting_version::get_min_last_success_version_parquet, }, }; use anyhow::Context; use aptos_indexer_processor_sdk::{ aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, builder::ProcessorBuilder, - common_steps::TransactionStreamStep, + common_steps::{TransactionStreamStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS}, traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, }; -use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; use parquet::schema::types::Type; use processor::{ bq_analytics::generic_parquet_processor::HasParquetSchema, @@ -33,40 +35,19 @@ use processor::{ parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, parquet_write_set_changes::WriteSetChangeModel, }, - worker::TableFlags, }; -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc}; use tracing::{debug, info}; -const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; - pub struct ParquetDefaultProcessor { pub config: IndexerProcessorConfig, - pub db_pool: ArcDbPool, // for processor status + pub db_pool: ArcDbPool, } impl ParquetDefaultProcessor { pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result { - match config.db_config { - DbConfig::PostgresConfig(ref postgres_config) => { - let conn_pool = new_db_pool( - &postgres_config.connection_string, - Some(postgres_config.db_pool_size), - ) - .await - .map_err(|e| { - anyhow::anyhow!( - "Failed to create connection pool for PostgresConfig: {:?}", - e - ) - })?; - - Ok(Self { - config, - db_pool: conn_pool, - }) - }, - } + let db_pool = initialize_database_pool(&config.db_config).await?; + Ok(Self { config, db_pool }) } } @@ -78,33 +59,21 @@ impl ProcessorTrait for ParquetDefaultProcessor { async fn run_processor(&self) -> anyhow::Result<()> { // Run Migrations - match self.config.db_config { - DbConfig::PostgresConfig(ref postgres_config) => { + let parquet_db_config = match self.config.db_config { + DbConfig::ParquetConfig(ref parquet_config) => { run_migrations( - postgres_config.connection_string.clone(), + parquet_config.connection_string.clone(), self.db_pool.clone(), ) .await; + parquet_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid db config for ParquetDefaultProcessor {:?}", + self.config.db_config + )); }, - } - - // Determine the processing mode (backfill or regular) - let is_backfill = self.config.backfill_config.is_some(); - - // TODO: Revisit when parquet version tracker is available. - // Query the starting version - let starting_version = if is_backfill { - get_starting_version(&self.config, self.db_pool.clone()).await? - } else { - // Regular mode logic: Fetch the minimum last successful version across all relevant tables - let table_names = self - .config - .processor_config - .get_table_names() - .context("Failed to get table names for the processor")?; - - get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names) - .await? }; // Check and update the ledger chain id to ensure we're indexing the correct chain @@ -126,6 +95,19 @@ impl ProcessorTrait for ParquetDefaultProcessor { }, }; + let processor_status_table_names = self + .config + .processor_config + .get_processor_status_table_names() + .context("Failed to get table names for the processor status table")?; + + let starting_version = get_min_last_success_version_parquet( + &self.config, + self.db_pool.clone(), + processor_status_table_names, + ) + .await?; + // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), @@ -133,24 +115,13 @@ impl ProcessorTrait for ParquetDefaultProcessor { }) .await?; + let backfill_table = set_backfill_table_flag(parquet_processor_config.backfill_table); let parquet_default_extractor = ParquetDefaultExtractor { - opt_in_tables: TableFlags::empty(), + opt_in_tables: backfill_table, }; - let credentials = parquet_processor_config - .google_application_credentials - .clone(); - - if let Some(credentials) = credentials { - std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); - } - - let gcs_config = GcsClientConfig::default() - .with_auth() - .await - .expect("Failed to create GCS client config"); - - let gcs_client = Arc::new(GCSClient::new(gcs_config)); + let gcs_client = + initialize_gcs_client(parquet_db_config.google_application_credentials.clone()).await; let parquet_type_to_schemas: HashMap> = [ (ParquetTypeEnum::MoveResource, MoveResource::schema()), @@ -165,37 +136,37 @@ impl ProcessorTrait for ParquetDefaultProcessor { .into_iter() .collect(); - let parquet_type_to_writer = parquet_type_to_schemas - .iter() - .map(|(key, schema)| { - let writer = create_new_writer(schema.clone()).expect("Failed to create writer"); - (*key, writer) - }) - .collect(); - - let buffer_uploader = GCSUploader::new( + let default_size_buffer_step = initialize_parquet_buffer_step( gcs_client.clone(), parquet_type_to_schemas, - parquet_type_to_writer, - parquet_processor_config.bucket_name.clone(), - parquet_processor_config.bucket_root.clone(), + parquet_processor_config.upload_interval, + parquet_processor_config.max_buffer_size, + parquet_db_config.bucket_name.clone(), + parquet_db_config.bucket_root.clone(), self.name().to_string(), - )?; + ) + .await + .unwrap_or_else(|e| { + panic!("Failed to initialize parquet buffer step: {:?}", e); + }); + + let parquet_version_tracker_step = ParquetVersionTrackerStep::new( + get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + ); let channel_size = parquet_processor_config.channel_size; - let default_size_buffer_step = ParquetBufferStep::new( - Duration::from_secs(parquet_processor_config.parquet_upload_interval), - buffer_uploader, - parquet_processor_config.max_buffer_size, - ); - // Connect processor steps together let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( transaction_stream.into_runnable_step(), ) .connect_to(parquet_default_extractor.into_runnable_step(), channel_size) .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .connect_to( + parquet_version_tracker_step.into_runnable_step(), + channel_size, + ) .end_and_return_output_receiver(channel_size); loop { diff --git a/rust/sdk-processor/src/processors/account_transactions_processor.rs b/rust/sdk-processor/src/processors/account_transactions_processor.rs index 7260c3c02..ff5e2624a 100644 --- a/rust/sdk-processor/src/processors/account_transactions_processor.rs +++ b/rust/sdk-processor/src/processors/account_transactions_processor.rs @@ -50,6 +50,10 @@ impl AccountTransactionsProcessor { db_pool: conn_pool, }) }, + _ => Err(anyhow::anyhow!( + "Invalid db config for AccountTransactionsProcessor {:?}", + config.db_config + )), } } } @@ -62,14 +66,12 @@ impl ProcessorTrait for AccountTransactionsProcessor { async fn run_processor(&self) -> Result<()> { // Run migrations - match self.config.db_config { - DbConfig::PostgresConfig(ref postgres_config) => { - run_migrations( - postgres_config.connection_string.clone(), - self.db_pool.clone(), - ) - .await; - }, + if let DbConfig::PostgresConfig(ref postgres_config) = self.config.db_config { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; } // Merge the starting version from config and the latest processed version from the DB. diff --git a/rust/sdk-processor/src/processors/ans_processor.rs b/rust/sdk-processor/src/processors/ans_processor.rs index fbc5605a8..23f8c0d87 100644 --- a/rust/sdk-processor/src/processors/ans_processor.rs +++ b/rust/sdk-processor/src/processors/ans_processor.rs @@ -63,6 +63,10 @@ impl AnsProcessor { db_pool: conn_pool, }) }, + _ => Err(anyhow::anyhow!( + "Invalid db config for ANS Processor {:?}", + config.db_config + )), } } } @@ -75,14 +79,12 @@ impl ProcessorTrait for AnsProcessor { async fn run_processor(&self) -> Result<()> { // Run migrations - match self.config.db_config { - DbConfig::PostgresConfig(ref postgres_config) => { - run_migrations( - postgres_config.connection_string.clone(), - self.db_pool.clone(), - ) - .await; - }, + if let DbConfig::PostgresConfig(ref postgres_config) = self.config.db_config { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; } // Merge the starting version from config and the latest processed version from the DB. diff --git a/rust/sdk-processor/src/processors/default_processor.rs b/rust/sdk-processor/src/processors/default_processor.rs index e27dd0d0b..fcf5dc954 100644 --- a/rust/sdk-processor/src/processors/default_processor.rs +++ b/rust/sdk-processor/src/processors/default_processor.rs @@ -52,6 +52,10 @@ impl DefaultProcessor { db_pool: conn_pool, }) }, + _ => Err(anyhow::anyhow!( + "Invalid db config for DefaultProcessor {:?}", + config.db_config + )), } } } @@ -64,14 +68,12 @@ impl ProcessorTrait for DefaultProcessor { async fn run_processor(&self) -> Result<()> { // Run migrations - match self.config.db_config { - DbConfig::PostgresConfig(ref postgres_config) => { - run_migrations( - postgres_config.connection_string.clone(), - self.db_pool.clone(), - ) - .await; - }, + if let DbConfig::PostgresConfig(ref postgres_config) = self.config.db_config { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; } // Merge the starting version from config and the latest processed version from the DB diff --git a/rust/sdk-processor/src/processors/events_processor.rs b/rust/sdk-processor/src/processors/events_processor.rs index 124ed0eb9..849a0f411 100644 --- a/rust/sdk-processor/src/processors/events_processor.rs +++ b/rust/sdk-processor/src/processors/events_processor.rs @@ -50,6 +50,10 @@ impl EventsProcessor { db_pool: conn_pool, }) }, + _ => Err(anyhow::anyhow!( + "Invalid db config for EventsProcessor {:?}", + config.db_config + )), } } } @@ -62,14 +66,12 @@ impl ProcessorTrait for EventsProcessor { async fn run_processor(&self) -> Result<()> { // Run migrations - match self.config.db_config { - DbConfig::PostgresConfig(ref postgres_config) => { - run_migrations( - postgres_config.connection_string.clone(), - self.db_pool.clone(), - ) - .await; - }, + if let DbConfig::PostgresConfig(ref postgres_config) = self.config.db_config { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; } // Merge the starting version from config and the latest processed version from the DB diff --git a/rust/sdk-processor/src/processors/fungible_asset_processor.rs b/rust/sdk-processor/src/processors/fungible_asset_processor.rs index a7c16779d..0e53bc814 100644 --- a/rust/sdk-processor/src/processors/fungible_asset_processor.rs +++ b/rust/sdk-processor/src/processors/fungible_asset_processor.rs @@ -54,6 +54,10 @@ impl FungibleAssetProcessor { db_pool: conn_pool, }) }, + _ => Err(anyhow::anyhow!( + "Invalid db config for FungibleAssetProcessor {:?}", + config.db_config + )), } } } @@ -66,14 +70,12 @@ impl ProcessorTrait for FungibleAssetProcessor { async fn run_processor(&self) -> Result<()> { // Run migrations - match self.config.db_config { - DbConfig::PostgresConfig(ref postgres_config) => { - run_migrations( - postgres_config.connection_string.clone(), - self.db_pool.clone(), - ) - .await; - }, + if let DbConfig::PostgresConfig(ref postgres_config) = self.config.db_config { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; } // Merge the starting version from config and the latest processed version from the DB diff --git a/rust/sdk-processor/src/processors/monitoring_processor.rs b/rust/sdk-processor/src/processors/monitoring_processor.rs index c18d81838..477760757 100644 --- a/rust/sdk-processor/src/processors/monitoring_processor.rs +++ b/rust/sdk-processor/src/processors/monitoring_processor.rs @@ -47,6 +47,10 @@ impl MonitoringProcessor { db_pool: conn_pool, }) }, + _ => Err(anyhow::anyhow!( + "Invalid db config for MonitoringProcessor {:?}", + config.db_config + )), } } } @@ -60,14 +64,12 @@ impl ProcessorTrait for MonitoringProcessor { /// This processor no-ops and is used for monitoring purposes. async fn run_processor(&self) -> Result<()> { // Run migrations - match self.config.db_config { - DbConfig::PostgresConfig(ref postgres_config) => { - run_migrations( - postgres_config.connection_string.clone(), - self.db_pool.clone(), - ) - .await; - }, + if let DbConfig::PostgresConfig(ref postgres_config) = self.config.db_config { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; } // Merge the starting version from config and the latest processed version from the DB diff --git a/rust/sdk-processor/src/processors/objects_processor.rs b/rust/sdk-processor/src/processors/objects_processor.rs index d69945dd3..2e19df342 100644 --- a/rust/sdk-processor/src/processors/objects_processor.rs +++ b/rust/sdk-processor/src/processors/objects_processor.rs @@ -75,6 +75,10 @@ impl ObjectsProcessor { db_pool: conn_pool, }) }, + _ => Err(anyhow::anyhow!( + "Invalid db config for ObjectsProcessor {:?}", + config.db_config + )), } } } @@ -87,14 +91,12 @@ impl ProcessorTrait for ObjectsProcessor { async fn run_processor(&self) -> Result<()> { // Run migrations - match self.config.db_config { - DbConfig::PostgresConfig(ref postgres_config) => { - run_migrations( - postgres_config.connection_string.clone(), - self.db_pool.clone(), - ) - .await; - }, + if let DbConfig::PostgresConfig(ref postgres_config) = self.config.db_config { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; } // Merge the starting version from config and the latest processed version from the DB diff --git a/rust/sdk-processor/src/processors/stake_processor.rs b/rust/sdk-processor/src/processors/stake_processor.rs index 4ae5e322a..1d58452c8 100644 --- a/rust/sdk-processor/src/processors/stake_processor.rs +++ b/rust/sdk-processor/src/processors/stake_processor.rs @@ -75,6 +75,10 @@ impl StakeProcessor { db_pool: conn_pool, }) }, + _ => Err(anyhow::anyhow!( + "Invalid db config for StakeProcessor {:?}", + config.db_config + )), } } } @@ -87,14 +91,12 @@ impl ProcessorTrait for StakeProcessor { async fn run_processor(&self) -> Result<()> { // Run migrations - match self.config.db_config { - DbConfig::PostgresConfig(ref postgres_config) => { - run_migrations( - postgres_config.connection_string.clone(), - self.db_pool.clone(), - ) - .await; - }, + if let DbConfig::PostgresConfig(ref postgres_config) = self.config.db_config { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; } // Merge the starting version from config and the latest processed version from the DB diff --git a/rust/sdk-processor/src/processors/token_v2_processor.rs b/rust/sdk-processor/src/processors/token_v2_processor.rs index 0f75af7c2..5329f33d6 100644 --- a/rust/sdk-processor/src/processors/token_v2_processor.rs +++ b/rust/sdk-processor/src/processors/token_v2_processor.rs @@ -76,6 +76,10 @@ impl TokenV2Processor { db_pool: conn_pool, }) }, + _ => Err(anyhow::anyhow!( + "Invalid db config for TokenV2Processor {:?}", + config.db_config + )), } } } @@ -88,14 +92,12 @@ impl ProcessorTrait for TokenV2Processor { async fn run_processor(&self) -> Result<()> { // Run migrations - match self.config.db_config { - DbConfig::PostgresConfig(ref postgres_config) => { - run_migrations( - postgres_config.connection_string.clone(), - self.db_pool.clone(), - ) - .await; - }, + if let DbConfig::PostgresConfig(ref postgres_config) = self.config.db_config { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; } // Merge the starting version from config and the latest processed version from the DB diff --git a/rust/sdk-processor/src/processors/user_transaction_processor.rs b/rust/sdk-processor/src/processors/user_transaction_processor.rs index 73d08f5e3..e08cd06cb 100644 --- a/rust/sdk-processor/src/processors/user_transaction_processor.rs +++ b/rust/sdk-processor/src/processors/user_transaction_processor.rs @@ -51,6 +51,10 @@ impl UserTransactionProcessor { db_pool: conn_pool, }) }, + _ => Err(anyhow::anyhow!( + "Invalid db config for UserTransactionProcessor {:?}", + config.db_config + )), } } } @@ -63,14 +67,12 @@ impl ProcessorTrait for UserTransactionProcessor { async fn run_processor(&self) -> Result<()> { // Run migrations - match self.config.db_config { - DbConfig::PostgresConfig(ref postgres_config) => { - run_migrations( - postgres_config.connection_string.clone(), - self.db_pool.clone(), - ) - .await; - }, + if let DbConfig::PostgresConfig(ref postgres_config) = self.config.db_config { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; } // Merge the starting version from config and the latest processed version from the DB diff --git a/rust/sdk-processor/src/steps/common/gcs_uploader.rs b/rust/sdk-processor/src/steps/common/gcs_uploader.rs index c6d8f435f..fca6b293f 100644 --- a/rust/sdk-processor/src/steps/common/gcs_uploader.rs +++ b/rust/sdk-processor/src/steps/common/gcs_uploader.rs @@ -38,28 +38,46 @@ impl Uploadable for GCSUploader { &mut self, buffer: ParquetTypeStructs, ) -> anyhow::Result<(), ProcessorError> { - let table_name = buffer.get_table_name(); - let result = match buffer { ParquetTypeStructs::Transaction(transactions) => { - self.upload_generic(&transactions[..], ParquetTypeEnum::Transaction, table_name) - .await + self.upload_generic( + &transactions[..], + ParquetTypeEnum::Transaction, + &ParquetTypeEnum::Transaction.to_string(), + ) + .await }, ParquetTypeStructs::MoveResource(resources) => { - self.upload_generic(&resources[..], ParquetTypeEnum::MoveResource, table_name) - .await + self.upload_generic( + &resources[..], + ParquetTypeEnum::MoveResource, + &ParquetTypeEnum::MoveResource.to_string(), + ) + .await }, ParquetTypeStructs::WriteSetChange(changes) => { - self.upload_generic(&changes[..], ParquetTypeEnum::WriteSetChange, table_name) - .await + self.upload_generic( + &changes[..], + ParquetTypeEnum::WriteSetChange, + &ParquetTypeEnum::WriteSetChange.to_string(), + ) + .await }, ParquetTypeStructs::TableItem(items) => { - self.upload_generic(&items[..], ParquetTypeEnum::TableItem, table_name) - .await + self.upload_generic( + &items[..], + ParquetTypeEnum::TableItem, + &ParquetTypeEnum::TableItem.to_string(), + ) + .await }, ParquetTypeStructs::MoveModule(modules) => { - self.upload_generic(&modules[..], ParquetTypeEnum::MoveModule, table_name) - .await + self.upload_generic( + &modules[..], + ParquetTypeEnum::MoveModule, + &ParquetTypeEnum::MoveModule.to_string(), + ) + .await }, }; @@ -141,7 +159,7 @@ impl GCSUploader { &mut self, data: &[ParquetType], parquet_type: ParquetTypeEnum, - table_name: &'static str, + table_name: &str, ) -> anyhow::Result<()> where ParquetType: HasVersion + GetTimeStamp + HasParquetSchema, @@ -184,6 +202,13 @@ impl GCSUploader { ) .await?; + debug!( + "Uploaded parquet to GCS for table: {}, start_version: {}, end_version: {}", + table_name, + data[0].version(), + data[data.len() - 1].version() + ); + Ok(()) } } diff --git a/rust/sdk-processor/src/steps/common/mod.rs b/rust/sdk-processor/src/steps/common/mod.rs index 18c449997..91d0d9f88 100644 --- a/rust/sdk-processor/src/steps/common/mod.rs +++ b/rust/sdk-processor/src/steps/common/mod.rs @@ -1,5 +1,6 @@ pub mod gcs_uploader; pub mod parquet_buffer_step; +pub mod parquet_version_tracker_step; pub mod processor_status_saver; pub use processor_status_saver::get_processor_status_saver; diff --git a/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs b/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs index 315873f51..f123a5b5c 100644 --- a/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs +++ b/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs @@ -39,7 +39,7 @@ impl ParquetBuffer { cur_batch_metadata: &TransactionMetadata, ) -> Result<(), ProcessorError> { if let Some(buffer_metadata) = &mut self.current_batch_metadata { - if buffer_metadata.end_version != cur_batch_metadata.start_version { + if buffer_metadata.end_version + 1 != cur_batch_metadata.start_version { // this shouldn't happen but if it does, we want to know return Err(ProcessorError::ProcessError { message: format!( @@ -289,7 +289,7 @@ impl NamedStep for ParquetBufferStep { #[cfg(test)] mod tests { use crate::{ - config::processor_config::ParquetDefaultProcessorConfig, + config::db_config::ParquetConfig, steps::common::{ gcs_uploader::{create_new_writer, GCSUploader}, parquet_buffer_step::{ParquetBufferStep, ParquetTypeEnum, ParquetTypeStructs}, @@ -305,22 +305,15 @@ mod tests { bq_analytics::generic_parquet_processor::HasParquetSchema, db::parquet::models::default_models::parquet_move_resources::MoveResource, }; - use std::{ - collections::{HashMap, HashSet}, - sync::Arc, - time::Duration, - }; + use std::{collections::HashMap, sync::Arc, time::Duration}; #[tokio::test] #[allow(clippy::needless_return)] async fn test_parquet_buffer_step_no_upload() -> anyhow::Result<()> { - let poll_interval = Duration::from_secs(10); - let buffer_max_size = 100; - let parquet_processor_config = create_parquet_processor_config(); - - let buffer_uploader = create_parquet_uploader(&parquet_processor_config).await?; + let db_config = create_parquet_db_config(); + let buffer_uploader = create_parquet_uploader(&db_config).await?; let mut parquet_step = - ParquetBufferStep::new(poll_interval, buffer_uploader, buffer_max_size); + ParquetBufferStep::new(Duration::from_secs(10), buffer_uploader, 100); let data = HashMap::from([( ParquetTypeEnum::MoveResource, @@ -344,13 +337,12 @@ mod tests { #[tokio::test] #[allow(clippy::needless_return)] async fn test_parquet_buffer_step_trigger_upload() -> anyhow::Result<()> { - let poll_interval = Duration::from_secs(10); let buffer_max_size = 25; // Default ParquetTypeStructs for MoveResource is 24 bytes - let parquet_processor_config = create_parquet_processor_config(); + let db_config = create_parquet_db_config(); - let buffer_uploader = create_parquet_uploader(&parquet_processor_config).await?; + let buffer_uploader = create_parquet_uploader(&db_config).await?; let mut parquet_step = - ParquetBufferStep::new(poll_interval, buffer_uploader, buffer_max_size); + ParquetBufferStep::new(Duration::from_secs(10), buffer_uploader, buffer_max_size); // Test data below `buffer_max_size` let data = HashMap::from([( @@ -388,9 +380,7 @@ mod tests { Ok(()) } - async fn create_parquet_uploader( - parquet_processor_config: &ParquetDefaultProcessorConfig, - ) -> anyhow::Result { + async fn create_parquet_uploader(db_config: &ParquetConfig) -> anyhow::Result { let gcs_config = GcsClientConfig::default() .with_auth() .await @@ -414,21 +404,19 @@ mod tests { gcs_client, parquet_type_to_schemas, parquet_type_to_writer, - parquet_processor_config.bucket_name.clone(), - parquet_processor_config.bucket_root.clone(), + db_config.bucket_name.clone(), + db_config.bucket_root.clone(), "processor_name".to_string(), ) } - fn create_parquet_processor_config() -> ParquetDefaultProcessorConfig { - ParquetDefaultProcessorConfig { + fn create_parquet_db_config() -> ParquetConfig { + ParquetConfig { + connection_string: "connection_string".to_string(), + db_pool_size: 10, bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), - parquet_upload_interval: 180, - max_buffer_size: 100, - channel_size: 100, google_application_credentials: None, - tables: HashSet::new(), } } } diff --git a/rust/sdk-processor/src/steps/common/parquet_version_tracker_step.rs b/rust/sdk-processor/src/steps/common/parquet_version_tracker_step.rs new file mode 100644 index 000000000..7b8be969b --- /dev/null +++ b/rust/sdk-processor/src/steps/common/parquet_version_tracker_step.rs @@ -0,0 +1,161 @@ +use crate::parquet_processors::ParquetTypeEnum; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + traits::{ + pollable_async_step::PollableAsyncRunType, NamedStep, PollableAsyncStep, Processable, + }, + types::transaction_context::{TransactionContext, TransactionMetadata}, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use std::collections::HashMap; +use tracing::debug; + +/// The ParquetProcessorStatusSaver trait object is intended to save +/// the latest successfully processed transaction version to storage, +/// ensuring that the processor_status is persistently stored. +#[async_trait] +pub trait ParquetProcessorStatusSaver { + async fn save_parquet_processor_status( + &self, + last_success_batch: &TransactionContext<()>, + table_name: &str, + ) -> Result<(), ProcessorError>; +} + +/// Tracks the versioned processing of sequential transactions, ensuring no gaps +/// occur between them. +/// +/// Important: this step assumes ordered transactions. Please use the `OrederByVersionStep` before this step +/// if the transactions are not ordered. +pub struct ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + // Last successful batch of sequentially processed transactions. Includes metadata to write to storage. + last_success_batch: HashMap>, + polling_interval_secs: u64, + processor_status_saver: S, +} + +impl ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + pub fn new(processor_status_saver: S, polling_interval_secs: u64) -> Self { + Self { + last_success_batch: HashMap::new(), + processor_status_saver, + polling_interval_secs, + } + } + + async fn save_processor_status(&mut self) -> Result<(), ProcessorError> { + for (parquet_type, last_success_batch) in &self.last_success_batch { + let table_name = parquet_type.to_string(); + self.processor_status_saver + .save_parquet_processor_status(last_success_batch, &table_name) + .await?; + } + Ok(()) + } +} + +#[async_trait] +impl Processable for ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + type Input = HashMap; + type Output = (); + type RunType = PollableAsyncRunType; + + async fn process( + &mut self, + current_batch: TransactionContext, + ) -> Result>, ProcessorError> { + let mut processed_data = HashMap::new(); + + // Check for version gap before processing each key-value pair + for (parquet_type, current_metadata) in ¤t_batch.data { + // we need to have a map of last_success_bath for parquet-Type as well. + // if there is a last_success_batch for the current parquet-Type then we need to check the version gap + debug!( + "checking for parquet_type: {:?} with start version {}, end_version {}", + parquet_type.to_string(), + current_metadata.start_version, + current_metadata.end_version + ); + if let Some(last_success) = self.last_success_batch.get(parquet_type) { + if last_success.metadata.end_version + 1 != current_metadata.start_version { + return Err(ProcessorError::ProcessError { + message: format!( + "Gap detected for {:?} starting from version: {}", + &parquet_type.to_string(), + current_metadata.start_version + ), + }); + } + } + + processed_data.insert(*parquet_type, current_metadata.clone()); + + // Update last_success_batch for the current key + self.last_success_batch + .entry(*parquet_type) + .and_modify(|e| { + e.data = (); + e.metadata = current_metadata.clone(); + }) + .or_insert(TransactionContext { + data: (), + metadata: current_metadata.clone(), + }); + } + + // Pass through the current batch with updated metadata + Ok(Some(TransactionContext { + data: (), + metadata: current_batch.metadata.clone(), + })) + } + + async fn cleanup( + &mut self, + ) -> Result>>, ProcessorError> { + // Save the last successful batch to the database + self.save_processor_status().await?; + Ok(None) + } +} + +#[async_trait] +impl PollableAsyncStep for ParquetVersionTrackerStep +where + Self: Sized + Send + Sync + 'static, + S: ParquetProcessorStatusSaver + Send + Sync + 'static, +{ + fn poll_interval(&self) -> std::time::Duration { + std::time::Duration::from_secs(self.polling_interval_secs) + } + + async fn poll(&mut self) -> Result>>, ProcessorError> { + // TODO: Add metrics for gap count + self.save_processor_status().await?; + // Nothing should be returned + Ok(None) + } +} + +impl NamedStep for ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + fn name(&self) -> String { + "ParquetVersionTrackerStep".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/common/processor_status_saver.rs b/rust/sdk-processor/src/steps/common/processor_status_saver.rs index 50bb39ae3..b958acae9 100644 --- a/rust/sdk-processor/src/steps/common/processor_status_saver.rs +++ b/rust/sdk-processor/src/steps/common/processor_status_saver.rs @@ -1,9 +1,10 @@ use crate::{ - config::indexer_processor_config::IndexerProcessorConfig, + config::{db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig}, db::common::models::{ backfill_processor_status::{BackfillProcessorStatus, BackfillStatus}, processor_status::ProcessorStatus, }, + steps::common::parquet_version_tracker_step::ParquetProcessorStatusSaver, utils::database::{execute_with_better_error, ArcDbPool}, }; use anyhow::Result; @@ -33,15 +34,22 @@ pub fn get_processor_status_saver( } } else { let processor_name = config.processor_config.name().to_string(); - ProcessorStatusSaverEnum::Default { - conn_pool, - processor_name, + if let DbConfig::ParquetConfig(_) = config.db_config { + ProcessorStatusSaverEnum::Parquet { + conn_pool, + processor_name, + } + } else { + ProcessorStatusSaverEnum::Postgres { + conn_pool, + processor_name, + } } } } pub enum ProcessorStatusSaverEnum { - Default { + Postgres { conn_pool: ArcDbPool, processor_name: String, }, @@ -51,6 +59,10 @@ pub enum ProcessorStatusSaverEnum { backfill_start_version: Option, backfill_end_version: Option, }, + Parquet { + conn_pool: ArcDbPool, + processor_name: String, + }, } #[async_trait] @@ -58,6 +70,32 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { async fn save_processor_status( &self, last_success_batch: &TransactionContext<()>, + ) -> Result<(), ProcessorError> { + self.save_processor_status_with_optional_table_names(last_success_batch, None) + .await + } +} + +#[async_trait] +impl ParquetProcessorStatusSaver for ProcessorStatusSaverEnum { + async fn save_parquet_processor_status( + &self, + last_success_batch: &TransactionContext<()>, + table_name: &str, + ) -> Result<(), ProcessorError> { + self.save_processor_status_with_optional_table_names( + last_success_batch, + Some(table_name.to_string()), + ) + .await + } +} + +impl ProcessorStatusSaverEnum { + async fn save_processor_status_with_optional_table_names( + &self, + last_success_batch: &TransactionContext<()>, + table_name: Option, ) -> Result<(), ProcessorError> { let end_timestamp = last_success_batch .metadata @@ -66,12 +104,22 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { .map(|t| parse_timestamp(t, last_success_batch.metadata.end_version as i64)) .map(|t| t.naive_utc()); match self { - ProcessorStatusSaverEnum::Default { + ProcessorStatusSaverEnum::Postgres { + conn_pool, + processor_name, + } + | ProcessorStatusSaverEnum::Parquet { conn_pool, processor_name, } => { + let processor_name = if table_name.is_some() { + format!("{}_{}", processor_name, table_name.unwrap()) + } else { + processor_name.clone() + }; + let status = ProcessorStatus { - processor: processor_name.clone(), + processor: processor_name, last_success_version: last_success_batch.metadata.end_version as i64, last_transaction_timestamp: end_timestamp, }; @@ -92,7 +140,7 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { )), Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "), ) - .await?; + .await?; Ok(()) }, @@ -140,9 +188,9 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { backfill_processor_status::backfill_end_version .eq(excluded(backfill_processor_status::backfill_end_version)), )), - Some(" WHERE backfill_processor_status.last_success_version <= EXCLUDED.last_success_version "), + Some(" WHERE backfill_processor_status.last_success_version <= EXCLUDED.last_success_version "), ) - .await?; + .await?; Ok(()) }, } diff --git a/rust/sdk-processor/src/utils/mod.rs b/rust/sdk-processor/src/utils/mod.rs index 252b6f0b2..eb4865fc3 100644 --- a/rust/sdk-processor/src/utils/mod.rs +++ b/rust/sdk-processor/src/utils/mod.rs @@ -1,4 +1,5 @@ pub mod chain_id; pub mod database; pub mod parquet_extractor_helper; +pub mod parquet_processor_table_mapping; pub mod starting_version; diff --git a/rust/sdk-processor/src/utils/parquet_processor_table_mapping.rs b/rust/sdk-processor/src/utils/parquet_processor_table_mapping.rs new file mode 100644 index 000000000..3b695503d --- /dev/null +++ b/rust/sdk-processor/src/utils/parquet_processor_table_mapping.rs @@ -0,0 +1,17 @@ +use crate::config::processor_config::{ProcessorConfig, ProcessorName}; +use lazy_static::lazy_static; +use std::collections::{HashMap, HashSet}; +use strum::IntoEnumIterator; + +lazy_static! { + pub static ref VALID_TABLE_NAMES: HashMap> = { + let mut map = HashMap::new(); + for processor_name in ProcessorName::iter() { + map.insert( + processor_name.to_string(), + ProcessorConfig::table_names(&processor_name), + ); + } + map + }; +} diff --git a/rust/sdk-processor/src/utils/starting_version.rs b/rust/sdk-processor/src/utils/starting_version.rs index 8a69ab77d..e2f821de3 100644 --- a/rust/sdk-processor/src/utils/starting_version.rs +++ b/rust/sdk-processor/src/utils/starting_version.rs @@ -49,12 +49,17 @@ pub async fn get_starting_version( pub async fn get_min_last_success_version_parquet( indexer_processor_config: &IndexerProcessorConfig, conn_pool: ArcDbPool, - processor_names: Vec, + table_names: Vec, ) -> Result { - let min_processed_version = - get_min_processed_version_from_db(conn_pool.clone(), processor_names) + let min_processed_version = if indexer_processor_config.backfill_config.is_some() { + get_starting_version_from_db(indexer_processor_config, conn_pool.clone()) .await - .context("Failed to get minimum last success version from DB")?; + .context("Failed to get latest processed version from DB")? + } else { + get_min_processed_version_from_db(conn_pool.clone(), table_names) + .await + .context("Failed to get minimum last success version from DB")? + }; // If nothing checkpointed, return the `starting_version` from the config, or 0 if not set. Ok(min_processed_version.unwrap_or( @@ -65,14 +70,19 @@ pub async fn get_min_last_success_version_parquet( )) } +/// Get the minimum last success version from the database for the given processors. +/// +/// This should return the minimum of the last success version of the processors in the list. +/// If any of the tables handled by the parquet processor has no entry, it should use 0 as a default value. +/// To avoid skipping any versions, the minimum of the last success version should be used as the starting version. async fn get_min_processed_version_from_db( conn_pool: ArcDbPool, - processor_names: Vec, + table_names: Vec, ) -> Result> { let mut queries = Vec::new(); // Spawn all queries concurrently with separate connections - for processor_name in processor_names { + for processor_name in table_names { let conn_pool = conn_pool.clone(); let processor_name = processor_name.clone();