Skip to content

Commit

Permalink
add docstring [Cand update the name of functions
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Oct 31, 2024
1 parent 74b5cd0 commit 461caa2
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 24 deletions.
2 changes: 1 addition & 1 deletion rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl ProcessorConfig {
}

/// Get the Vec of table names for parquet processors only.
///
/// This is a convenience method to map the table names to include the processor name as a prefix, which
/// is useful for querying the status from the processor status table in the database.
pub fn get_table_names(&self) -> Option<Vec<String>> {
Expand All @@ -65,7 +66,6 @@ impl ProcessorConfig {
// Get the processor name as a prefix
let prefix = self.name();
// Use the tables from the config and map them to include the prefix
println!("config.tables: {:?}", prefix);
Some(
config
.tables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
utils::{
chain_id::check_or_update_chain_id,
database::{new_db_pool, run_migrations, ArcDbPool},
starting_version::{get_minimum_last_success_version, get_starting_version},
starting_version::{get_min_last_success_version_parquet, get_starting_version},
},
};
use anyhow::Context;
Expand Down Expand Up @@ -62,7 +62,6 @@ impl ProcessorTrait for ParquetDefaultProcessor {
},
}

// TODO: Starting version from config. 0 if not set.
// Determine the processing mode (backfill or regular)
let is_backfill = self.config.backfill_config.is_some();

Expand All @@ -79,7 +78,7 @@ impl ProcessorTrait for ParquetDefaultProcessor {
"Failed to get table names for the processor {}",
self.config.processor_config.name()
))?;
get_minimum_last_success_version(&self.config, self.db_pool.clone(), table_names)
get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names)
.await?;
};

Expand Down
54 changes: 34 additions & 20 deletions rust/sdk-processor/src/utils/starting_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,19 @@ pub async fn get_starting_version(
))
}

pub async fn get_minimum_last_success_version(
/// Get the appropriate minimum last success version for the parquet processors.
///
/// This will return the minimum of the last success version of the processors in the list.
/// If no processor has a checkpoint, this will return the `starting_version` from the config, or 0 if not set.
pub async fn get_min_last_success_version_parquet(
indexer_processor_config: &IndexerProcessorConfig,
conn_pool: ArcDbPool,
processor_names: Vec<String>,
) -> Result<u64> {
let min_processed_version = get_processed_version_from_db(conn_pool.clone(), processor_names)
.await
.context("Failed to get minimum last success version from DB")?;
let min_processed_version =
get_min_processed_version_from_db(conn_pool.clone(), processor_names)
.await
.context("Failed to get minimum last success version from DB")?;

// If nothing checkpointed, return the `starting_version` from the config, or 0 if not set.
Ok(min_processed_version.unwrap_or(
Expand All @@ -60,7 +65,7 @@ pub async fn get_minimum_last_success_version(
))
}

async fn get_processed_version_from_db(
async fn get_min_processed_version_from_db(
conn_pool: ArcDbPool,
processor_names: Vec<String>,
) -> Result<Option<u64>> {
Expand Down Expand Up @@ -398,7 +403,7 @@ mod tests {

#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_get_minimum_last_success_version_no_checkpoints() {
async fn test_get_min_last_success_version_parquet_no_checkpoints() {
let mut db = PostgresTestDatabase::new();
db.setup().await.unwrap();
let indexer_processor_config = create_indexer_config(db.get_db_url(), None, Some(0));
Expand All @@ -409,17 +414,20 @@ mod tests {

let processor_names = vec!["processor_1".to_string(), "processor_2".to_string()];

let min_version =
get_minimum_last_success_version(&indexer_processor_config, conn_pool, processor_names)
.await
.unwrap();
let min_version = get_min_last_success_version_parquet(
&indexer_processor_config,
conn_pool,
processor_names,
)
.await
.unwrap();

assert_eq!(min_version, 0);
}

#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_get_minimum_last_success_version_with_checkpoints() {
async fn test_get_min_last_success_version_parquet_with_checkpoints() {
let mut db = PostgresTestDatabase::new();
db.setup().await.unwrap();
let indexer_processor_config = create_indexer_config(db.get_db_url(), None, Some(0));
Expand Down Expand Up @@ -448,17 +456,20 @@ mod tests {

let processor_names = vec!["processor_1".to_string(), "processor_2".to_string()];

let min_version =
get_minimum_last_success_version(&indexer_processor_config, conn_pool, processor_names)
.await
.unwrap();
let min_version = get_min_last_success_version_parquet(
&indexer_processor_config,
conn_pool,
processor_names,
)
.await
.unwrap();

assert_eq!(min_version, 5);
}

#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_get_minimum_last_success_version_with_partial_checkpoints() {
async fn test_get_min_last_success_version_parquet_with_partial_checkpoints() {
let mut db = PostgresTestDatabase::new();
db.setup().await.unwrap();
let indexer_processor_config = create_indexer_config(db.get_db_url(), None, Some(0));
Expand All @@ -483,10 +494,13 @@ mod tests {
"processor_2".to_string(), // No checkpoint for processor_2
];

let min_version =
get_minimum_last_success_version(&indexer_processor_config, conn_pool, processor_names)
.await
.unwrap();
let min_version = get_min_last_success_version_parquet(
&indexer_processor_config,
conn_pool,
processor_names,
)
.await
.unwrap();

// Since processor_2 has no checkpoint, the minimum version should be the starting version of processor_1
assert_eq!(min_version, 0);
Expand Down

0 comments on commit 461caa2

Please sign in to comment.