From 461caa226938c27aec4f1816f78d1e687204c70c Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Thu, 31 Oct 2024 10:35:27 -0700 Subject: [PATCH] add docstring [Cand update the name of functions --- .../src/config/processor_config.rs | 2 +- .../parquet_default_processor.rs | 5 +- .../src/utils/starting_version.rs | 54 ++++++++++++------- 3 files changed, 37 insertions(+), 24 deletions(-) diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index b8105f4c7..db245d1a4 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -57,6 +57,7 @@ impl ProcessorConfig { } /// Get the Vec of table names for parquet processors only. + /// /// This is a convenience method to map the table names to include the processor name as a prefix, which /// is useful for querying the status from the processor status table in the database. pub fn get_table_names(&self) -> Option> { @@ -65,7 +66,6 @@ impl ProcessorConfig { // Get the processor name as a prefix let prefix = self.name(); // Use the tables from the config and map them to include the prefix - println!("config.tables: {:?}", prefix); Some( config .tables diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index 2919ed38f..4799794c6 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -6,7 +6,7 @@ use crate::{ utils::{ chain_id::check_or_update_chain_id, database::{new_db_pool, run_migrations, ArcDbPool}, - starting_version::{get_minimum_last_success_version, get_starting_version}, + starting_version::{get_min_last_success_version_parquet, get_starting_version}, }, }; use anyhow::Context; @@ -62,7 +62,6 @@ impl ProcessorTrait for ParquetDefaultProcessor { }, } - // TODO: Starting version from config. 0 if not set. // Determine the processing mode (backfill or regular) let is_backfill = self.config.backfill_config.is_some(); @@ -79,7 +78,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { "Failed to get table names for the processor {}", self.config.processor_config.name() ))?; - get_minimum_last_success_version(&self.config, self.db_pool.clone(), table_names) + get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names) .await?; }; diff --git a/rust/sdk-processor/src/utils/starting_version.rs b/rust/sdk-processor/src/utils/starting_version.rs index 8e381e22b..8a69ab77d 100644 --- a/rust/sdk-processor/src/utils/starting_version.rs +++ b/rust/sdk-processor/src/utils/starting_version.rs @@ -42,14 +42,19 @@ pub async fn get_starting_version( )) } -pub async fn get_minimum_last_success_version( +/// Get the appropriate minimum last success version for the parquet processors. +/// +/// This will return the minimum of the last success version of the processors in the list. +/// If no processor has a checkpoint, this will return the `starting_version` from the config, or 0 if not set. +pub async fn get_min_last_success_version_parquet( indexer_processor_config: &IndexerProcessorConfig, conn_pool: ArcDbPool, processor_names: Vec, ) -> Result { - let min_processed_version = get_processed_version_from_db(conn_pool.clone(), processor_names) - .await - .context("Failed to get minimum last success version from DB")?; + let min_processed_version = + get_min_processed_version_from_db(conn_pool.clone(), processor_names) + .await + .context("Failed to get minimum last success version from DB")?; // If nothing checkpointed, return the `starting_version` from the config, or 0 if not set. Ok(min_processed_version.unwrap_or( @@ -60,7 +65,7 @@ pub async fn get_minimum_last_success_version( )) } -async fn get_processed_version_from_db( +async fn get_min_processed_version_from_db( conn_pool: ArcDbPool, processor_names: Vec, ) -> Result> { @@ -398,7 +403,7 @@ mod tests { #[tokio::test] #[allow(clippy::needless_return)] - async fn test_get_minimum_last_success_version_no_checkpoints() { + async fn test_get_min_last_success_version_parquet_no_checkpoints() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); let indexer_processor_config = create_indexer_config(db.get_db_url(), None, Some(0)); @@ -409,17 +414,20 @@ mod tests { let processor_names = vec!["processor_1".to_string(), "processor_2".to_string()]; - let min_version = - get_minimum_last_success_version(&indexer_processor_config, conn_pool, processor_names) - .await - .unwrap(); + let min_version = get_min_last_success_version_parquet( + &indexer_processor_config, + conn_pool, + processor_names, + ) + .await + .unwrap(); assert_eq!(min_version, 0); } #[tokio::test] #[allow(clippy::needless_return)] - async fn test_get_minimum_last_success_version_with_checkpoints() { + async fn test_get_min_last_success_version_parquet_with_checkpoints() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); let indexer_processor_config = create_indexer_config(db.get_db_url(), None, Some(0)); @@ -448,17 +456,20 @@ mod tests { let processor_names = vec!["processor_1".to_string(), "processor_2".to_string()]; - let min_version = - get_minimum_last_success_version(&indexer_processor_config, conn_pool, processor_names) - .await - .unwrap(); + let min_version = get_min_last_success_version_parquet( + &indexer_processor_config, + conn_pool, + processor_names, + ) + .await + .unwrap(); assert_eq!(min_version, 5); } #[tokio::test] #[allow(clippy::needless_return)] - async fn test_get_minimum_last_success_version_with_partial_checkpoints() { + async fn test_get_min_last_success_version_parquet_with_partial_checkpoints() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); let indexer_processor_config = create_indexer_config(db.get_db_url(), None, Some(0)); @@ -483,10 +494,13 @@ mod tests { "processor_2".to_string(), // No checkpoint for processor_2 ]; - let min_version = - get_minimum_last_success_version(&indexer_processor_config, conn_pool, processor_names) - .await - .unwrap(); + let min_version = get_min_last_success_version_parquet( + &indexer_processor_config, + conn_pool, + processor_names, + ) + .await + .unwrap(); // Since processor_2 has no checkpoint, the minimum version should be the starting version of processor_1 assert_eq!(min_version, 0);