Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[parquet-sdk-migration] add a logic to determine the starting version for parquet processor #580

Merged
merged 2 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,28 @@ impl ProcessorConfig {
pub fn name(&self) -> &'static str {
self.into()
}

/// Get the Vec of table names for parquet processors only.
yuunlimm marked this conversation as resolved.
Show resolved Hide resolved
///
/// This is a convenience method to map the table names to include the processor name as a prefix, which
/// is useful for querying the status from the processor status table in the database.
pub fn get_table_names(&self) -> Option<Vec<String>> {
match self {
ProcessorConfig::ParquetDefaultProcessor(config) => {
// Get the processor name as a prefix
let prefix = self.name();
// Use the tables from the config and map them to include the prefix
Some(
config
.tables
.iter()
.map(|table_name| format!("{}_{}", prefix, table_name))
.collect(),
)
},
_ => None, // For all other processor types, return None
}
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use crate::{
utils::{
chain_id::check_or_update_chain_id,
database::{new_db_pool, run_migrations, ArcDbPool},
starting_version::{get_min_last_success_version_parquet, get_starting_version},
},
};
use anyhow::Context;
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::TransactionStream, traits::processor_trait::ProcessorTrait,
};
Expand Down Expand Up @@ -60,7 +62,25 @@ impl ProcessorTrait for ParquetDefaultProcessor {
},
}

// TODO: Starting version from config. 0 if not set.
// Determine the processing mode (backfill or regular)
let is_backfill = self.config.backfill_config.is_some();

// Query the starting version
let _starting_version = if is_backfill {
get_starting_version(&self.config, self.db_pool.clone()).await?;
} else {
// Regular mode logic: Fetch the minimum last successful version across all relevant tables
let table_names = self
.config
.processor_config
.get_table_names()
.context(format!(
"Failed to get table names for the processor {}",
self.config.processor_config.name()
))?;
get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names)
.await?;
};

// Check and update the ledger chain id to ensure we're indexing the correct chain
let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone())
Expand Down
183 changes: 182 additions & 1 deletion rust/sdk-processor/src/utils/starting_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
utils::database::execute_with_better_error,
};
use anyhow::{Context, Result};
use diesel::{upsert::excluded, ExpressionMethods};
use diesel::{result::Error as DieselError, upsert::excluded, ExpressionMethods};
use processor::schema::backfill_processor_status;

/// Get the appropriate starting version for the processor.
Expand Down Expand Up @@ -42,6 +42,82 @@ pub async fn get_starting_version(
))
}

/// Get the appropriate minimum last success version for the parquet processors.
///
/// This will return the minimum of the last success version of the processors in the list.
/// If no processor has a checkpoint, this will return the `starting_version` from the config, or 0 if not set.
pub async fn get_min_last_success_version_parquet(
indexer_processor_config: &IndexerProcessorConfig,
conn_pool: ArcDbPool,
processor_names: Vec<String>,
) -> Result<u64> {
let min_processed_version =
get_min_processed_version_from_db(conn_pool.clone(), processor_names)
.await
.context("Failed to get minimum last success version from DB")?;

// If nothing checkpointed, return the `starting_version` from the config, or 0 if not set.
Ok(min_processed_version.unwrap_or(
indexer_processor_config
.transaction_stream_config
.starting_version
.unwrap_or(0),
))
}

async fn get_min_processed_version_from_db(
conn_pool: ArcDbPool,
processor_names: Vec<String>,
) -> Result<Option<u64>> {
let mut queries = Vec::new();

// Spawn all queries concurrently with separate connections
for processor_name in processor_names {
let conn_pool = conn_pool.clone();
let processor_name = processor_name.clone();

let query = async move {
let mut conn = conn_pool.get().await.map_err(|err| {
// the type returned by conn_pool.get().await? does not have an appropriate error type that can be converted into diesel::result::Error.
// In this case, the error type is bb8::api::RunError<PoolError>, but the ? operator is trying to convert it to diesel::result::Error, which fails because there's no conversion implemented between those types.
// so we convert the error type from connection pool into a type that Diesel API expects.
DieselError::DatabaseError(
diesel::result::DatabaseErrorKind::UnableToSendCommand,
Box::new(err.to_string()),
)
})?;
ProcessorStatusQuery::get_by_processor(&processor_name, &mut conn).await
};

queries.push(query);
}

let results = futures::future::join_all(queries).await;

// Collect results and find the minimum processed version
let min_processed_version = results
.into_iter()
.filter_map(|res| {
match res {
// If the result is `Ok`, proceed to check the status
Ok(Some(status)) => {
// Return the version if the status contains a version
Some(status.last_success_version as u64)
},
// Handle specific cases where `Ok` contains `None` (no status found)
Ok(None) => Some(0),
// TODO: If the result is an `Err`, what should we do?
Err(e) => {
eprintln!("Error fetching processor status: {:?}", e);
None
},
}
})
.min();

Ok(min_processed_version)
}

async fn get_starting_version_from_db(
indexer_processor_config: &IndexerProcessorConfig,
conn_pool: ArcDbPool,
Expand Down Expand Up @@ -324,4 +400,109 @@ mod tests {

assert_eq!(starting_version, 11)
}

#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_get_min_last_success_version_parquet_no_checkpoints() {
let mut db = PostgresTestDatabase::new();
db.setup().await.unwrap();
let indexer_processor_config = create_indexer_config(db.get_db_url(), None, Some(0));
let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10))
.await
.expect("Failed to create connection pool");
run_migrations(db.get_db_url(), conn_pool.clone()).await;

let processor_names = vec!["processor_1".to_string(), "processor_2".to_string()];

let min_version = get_min_last_success_version_parquet(
&indexer_processor_config,
conn_pool,
processor_names,
)
.await
.unwrap();

assert_eq!(min_version, 0);
}

#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_get_min_last_success_version_parquet_with_checkpoints() {
let mut db = PostgresTestDatabase::new();
db.setup().await.unwrap();
let indexer_processor_config = create_indexer_config(db.get_db_url(), None, Some(0));
let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10))
.await
.expect("Failed to create connection pool");
run_migrations(db.get_db_url(), conn_pool.clone()).await;

// Insert processor statuses with different last_success_version values
diesel::insert_into(processor::schema::processor_status::table)
.values(vec![
ProcessorStatus {
processor: "processor_1".to_string(),
last_success_version: 10,
last_transaction_timestamp: None,
},
ProcessorStatus {
processor: "processor_2".to_string(),
last_success_version: 5,
last_transaction_timestamp: None,
},
])
.execute(&mut conn_pool.clone().get().await.unwrap())
.await
.expect("Failed to insert processor status");

let processor_names = vec!["processor_1".to_string(), "processor_2".to_string()];

let min_version = get_min_last_success_version_parquet(
&indexer_processor_config,
conn_pool,
processor_names,
)
.await
.unwrap();

assert_eq!(min_version, 5);
}

#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_get_min_last_success_version_parquet_with_partial_checkpoints() {
let mut db = PostgresTestDatabase::new();
db.setup().await.unwrap();
let indexer_processor_config = create_indexer_config(db.get_db_url(), None, Some(0));
let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10))
.await
.expect("Failed to create connection pool");
run_migrations(db.get_db_url(), conn_pool.clone()).await;

// Insert processor statuses with different last_success_version values
diesel::insert_into(processor::schema::processor_status::table)
.values(vec![ProcessorStatus {
processor: "processor_1".to_string(),
last_success_version: 15,
last_transaction_timestamp: None,
}])
.execute(&mut conn_pool.clone().get().await.unwrap())
.await
.expect("Failed to insert processor status");

let processor_names = vec![
"processor_1".to_string(),
"processor_2".to_string(), // No checkpoint for processor_2
];

let min_version = get_min_last_success_version_parquet(
&indexer_processor_config,
conn_pool,
processor_names,
)
.await
.unwrap();

// Since processor_2 has no checkpoint, the minimum version should be the starting version of processor_1
assert_eq!(min_version, 0);
}
}
Loading