diff --git a/rust/sdk-processor/src/steps/common/processor_status_saver.rs b/rust/sdk-processor/src/steps/common/processor_status_saver.rs index 96fa7195b..50bb39ae3 100644 --- a/rust/sdk-processor/src/steps/common/processor_status_saver.rs +++ b/rust/sdk-processor/src/steps/common/processor_status_saver.rs @@ -140,7 +140,7 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { backfill_processor_status::backfill_end_version .eq(excluded(backfill_processor_status::backfill_end_version)), )), - None, + Some(" WHERE backfill_processor_status.last_success_version <= EXCLUDED.last_success_version "), ) .await?; Ok(()) diff --git a/rust/sdk-processor/src/utils/starting_version.rs b/rust/sdk-processor/src/utils/starting_version.rs index 2f1d3ba31..d497fa2f9 100644 --- a/rust/sdk-processor/src/utils/starting_version.rs +++ b/rust/sdk-processor/src/utils/starting_version.rs @@ -2,11 +2,16 @@ use super::database::ArcDbPool; use crate::{ config::indexer_processor_config::IndexerProcessorConfig, db::common::models::{ - backfill_processor_status::{BackfillProcessorStatusQuery, BackfillStatus}, + backfill_processor_status::{ + BackfillProcessorStatus, BackfillProcessorStatusQuery, BackfillStatus, + }, processor_status::ProcessorStatusQuery, }, + utils::database::execute_with_better_error, }; use anyhow::{Context, Result}; +use diesel::{upsert::excluded, ExpressionMethods}; +use processor::schema::backfill_processor_status; /// Get the appropriate starting version for the processor. /// @@ -44,7 +49,7 @@ async fn get_starting_version_from_db( let mut conn = conn_pool.get().await?; if let Some(backfill_config) = &indexer_processor_config.backfill_config { - let backfill_status = BackfillProcessorStatusQuery::get_by_processor( + let backfill_status_option = BackfillProcessorStatusQuery::get_by_processor( &backfill_config.backfill_alias, &mut conn, ) @@ -53,13 +58,57 @@ async fn get_starting_version_from_db( // Return None if there is no checkpoint or if the backfill is old (complete). // Otherwise, return the checkpointed version + 1. - return Ok( - backfill_status.and_then(|status| match status.backfill_status { - BackfillStatus::InProgress => Some(status.last_success_version as u64 + 1), + if let Some(status) = backfill_status_option { + match status.backfill_status { + BackfillStatus::InProgress => { + return Ok(Some(status.last_success_version as u64 + 1)); + }, // If status is Complete, this is the start of a new backfill job. - BackfillStatus::Complete => None, - }), - ); + BackfillStatus::Complete => { + let backfill_alias = status.backfill_alias.clone(); + let backfill_end_version_mapped = status.backfill_end_version; + let status = BackfillProcessorStatus { + backfill_alias, + backfill_status: BackfillStatus::InProgress, + last_success_version: 0, + last_transaction_timestamp: None, + backfill_start_version: indexer_processor_config + .transaction_stream_config + .starting_version + .unwrap_or(0) as i64, + backfill_end_version: backfill_end_version_mapped, + }; + execute_with_better_error( + conn_pool.clone(), + diesel::insert_into(backfill_processor_status::table) + .values(&status) + .on_conflict(backfill_processor_status::backfill_alias) + .do_update() + .set(( + backfill_processor_status::backfill_status + .eq(excluded(backfill_processor_status::backfill_status)), + backfill_processor_status::last_success_version + .eq(excluded(backfill_processor_status::last_success_version)), + backfill_processor_status::last_updated + .eq(excluded(backfill_processor_status::last_updated)), + backfill_processor_status::last_transaction_timestamp.eq(excluded( + backfill_processor_status::last_transaction_timestamp, + )), + backfill_processor_status::backfill_start_version.eq(excluded( + backfill_processor_status::backfill_start_version, + )), + backfill_processor_status::backfill_end_version + .eq(excluded(backfill_processor_status::backfill_end_version)), + )), + None, + ) + .await?; + return Ok(None); + }, + } + } else { + return Ok(None); + } } let status = ProcessorStatusQuery::get_by_processor(