Skip to content

Commit

Permalink
Reset LastSuccessfulVersion to 0 if Restarting on Stauts=Complete (#571)
Browse files Browse the repository at this point in the history
* Reset latest saved version to 0 if restarting on row with status Complete

* linting
  • Loading branch information
dermanyang authored Oct 29, 2024
1 parent 9d44544 commit 9ac7d51
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum {
backfill_processor_status::backfill_end_version
.eq(excluded(backfill_processor_status::backfill_end_version)),
)),
None,
Some(" WHERE backfill_processor_status.last_success_version <= EXCLUDED.last_success_version "),
)
.await?;
Ok(())
Expand Down
65 changes: 57 additions & 8 deletions rust/sdk-processor/src/utils/starting_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ use super::database::ArcDbPool;
use crate::{
config::indexer_processor_config::IndexerProcessorConfig,
db::common::models::{
backfill_processor_status::{BackfillProcessorStatusQuery, BackfillStatus},
backfill_processor_status::{
BackfillProcessorStatus, BackfillProcessorStatusQuery, BackfillStatus,
},
processor_status::ProcessorStatusQuery,
},
utils::database::execute_with_better_error,
};
use anyhow::{Context, Result};
use diesel::{upsert::excluded, ExpressionMethods};
use processor::schema::backfill_processor_status;

/// Get the appropriate starting version for the processor.
///
Expand Down Expand Up @@ -44,7 +49,7 @@ async fn get_starting_version_from_db(
let mut conn = conn_pool.get().await?;

if let Some(backfill_config) = &indexer_processor_config.backfill_config {
let backfill_status = BackfillProcessorStatusQuery::get_by_processor(
let backfill_status_option = BackfillProcessorStatusQuery::get_by_processor(
&backfill_config.backfill_alias,
&mut conn,
)
Expand All @@ -53,13 +58,57 @@ async fn get_starting_version_from_db(

// Return None if there is no checkpoint or if the backfill is old (complete).
// Otherwise, return the checkpointed version + 1.
return Ok(
backfill_status.and_then(|status| match status.backfill_status {
BackfillStatus::InProgress => Some(status.last_success_version as u64 + 1),
if let Some(status) = backfill_status_option {
match status.backfill_status {
BackfillStatus::InProgress => {
return Ok(Some(status.last_success_version as u64 + 1));
},
// If status is Complete, this is the start of a new backfill job.
BackfillStatus::Complete => None,
}),
);
BackfillStatus::Complete => {
let backfill_alias = status.backfill_alias.clone();
let backfill_end_version_mapped = status.backfill_end_version;
let status = BackfillProcessorStatus {
backfill_alias,
backfill_status: BackfillStatus::InProgress,
last_success_version: 0,
last_transaction_timestamp: None,
backfill_start_version: indexer_processor_config
.transaction_stream_config
.starting_version
.unwrap_or(0) as i64,
backfill_end_version: backfill_end_version_mapped,
};
execute_with_better_error(
conn_pool.clone(),
diesel::insert_into(backfill_processor_status::table)
.values(&status)
.on_conflict(backfill_processor_status::backfill_alias)
.do_update()
.set((
backfill_processor_status::backfill_status
.eq(excluded(backfill_processor_status::backfill_status)),
backfill_processor_status::last_success_version
.eq(excluded(backfill_processor_status::last_success_version)),
backfill_processor_status::last_updated
.eq(excluded(backfill_processor_status::last_updated)),
backfill_processor_status::last_transaction_timestamp.eq(excluded(
backfill_processor_status::last_transaction_timestamp,
)),
backfill_processor_status::backfill_start_version.eq(excluded(
backfill_processor_status::backfill_start_version,
)),
backfill_processor_status::backfill_end_version
.eq(excluded(backfill_processor_status::backfill_end_version)),
)),
None,
)
.await?;
return Ok(None);
},
}
} else {
return Ok(None);
}
}

let status = ProcessorStatusQuery::get_by_processor(
Expand Down

0 comments on commit 9ac7d51

Please sign in to comment.