Skip to content

Commit

Permalink
Merge pull request #10 from aptos-labs/sdy/backfill_status
Browse files Browse the repository at this point in the history
Add BackfillStatus and Improve StartingVersion Selection on (re)start
  • Loading branch information
dermanyang authored Oct 22, 2024
2 parents 1f42a23 + fbbae16 commit 6038b1c
Show file tree
Hide file tree
Showing 9 changed files with 1,189 additions and 69 deletions.
915 changes: 885 additions & 30 deletions aptos-indexer-processor-example/Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion aptos-indexer-processor-example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ edition = "2021"
[dependencies]
ahash = { version = "0.8.7", features = ["serde"] }
anyhow = "1.0.86"
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "89207266ac6ba3370f8ed72918f0f95f141814dc" }
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "366b93b5a1534efa24de1a8dac22e2fbe54fbd00" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "ca658a0881c4bff4e1dee14c59a7c63608a5b315" }
aptos-indexer-testing-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "366b93b5a1534efa24de1a8dac22e2fbe54fbd00" }
async-trait = "0.1.80"
chrono = { version = "0.4.19", features = ["clock", "serde"] }
clap = { version = "4.3.5", features = ["derive", "unstable-styles"] }
Expand Down
2 changes: 1 addition & 1 deletion aptos-indexer-processor-example/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ server_config:
transaction_stream_config:
indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443"
starting_version: 0
# request_ending_version: 1
# request_ending_version: 100000
auth_token: "AUTH_TOKEN"
request_name_header: "events-processor"
db_config:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use crate::{
config::indexer_processor_config::IndexerProcessorConfig,
db::common::models::{
backfill_processor_status::BackfillProcessorStatus, processor_status::ProcessorStatus,
backfill_processor_status::{BackfillProcessorStatus, BackfillStatus},
processor_status::ProcessorStatus,
},
schema::{
backfill_processor_status::{self},
processor_status,
},
schema::{backfill_processor_status, processor_status},
utils::database::{execute_with_better_error, ArcDbPool},
};
use anyhow::Result;
Expand Down Expand Up @@ -101,11 +105,21 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum {
backfill_start_version,
backfill_end_version,
} => {
let lst_success_version = last_success_batch.metadata.end_version as i64;
let backfill_status = if backfill_end_version.is_some_and(|backfill_end_version| {
lst_success_version >= backfill_end_version as i64
}) {
BackfillStatus::Complete
} else {
BackfillStatus::InProgress
};
let status = BackfillProcessorStatus {
backfill_alias: backfill_alias.clone(),
last_success_version: last_success_batch.metadata.end_version as i64,
backfill_status: backfill_status,
last_success_version: lst_success_version,
last_transaction_timestamp: end_timestamp,
backfill_start_version: backfill_start_version.unwrap_or(0) as i64,

backfill_end_version: backfill_end_version
.unwrap_or(last_success_batch.metadata.end_version)
as i64,
Expand All @@ -117,6 +131,7 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum {
.on_conflict(backfill_processor_status::backfill_alias)
.do_update()
.set((
backfill_processor_status::backfill_status.eq(excluded(backfill_processor_status::backfill_status)),
backfill_processor_status::last_success_version.eq(excluded(backfill_processor_status::last_success_version)),
backfill_processor_status::last_updated.eq(excluded(backfill_processor_status::last_updated)),
backfill_processor_status::last_transaction_timestamp.eq(excluded(backfill_processor_status::last_transaction_timestamp)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,52 @@
#![allow(clippy::extra_unused_lifetimes)]

use crate::{schema::backfill_processor_status, utils::database::DbPoolConnection};
use diesel::deserialize;
use diesel::deserialize::{FromSql, FromSqlRow};
use diesel::expression::AsExpression;
use diesel::pg::{Pg, PgValue};
use diesel::serialize;
use diesel::serialize::{IsNull, Output, ToSql};
use diesel::sql_types::Text;
use diesel::{AsChangeset, ExpressionMethods, Insertable, OptionalExtension, QueryDsl, Queryable};
use diesel_async::RunQueryDsl;
use std::io::Write;

#[derive(Debug, PartialEq, FromSqlRow, AsExpression, Eq)]
#[diesel(sql_type = Text)]
pub enum BackfillStatus {
// #[diesel(rename = "in_progress")]
InProgress,
// #[diesel(rename = "complete")]
Complete,
}

impl ToSql<Text, Pg> for BackfillStatus {
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> serialize::Result {
match *self {
BackfillStatus::InProgress => out.write_all(b"in_progress")?,
BackfillStatus::Complete => out.write_all(b"complete")?,
}
Ok(IsNull::No)
}
}

impl FromSql<Text, Pg> for BackfillStatus {
fn from_sql(bytes: PgValue<'_>) -> deserialize::Result<Self> {
match bytes.as_bytes() {
b"in_progress" => Ok(BackfillStatus::InProgress),
b"complete" => Ok(BackfillStatus::Complete),
_ => Err("Unrecognized enum variant".into()),
}
}
}

#[derive(AsChangeset, Debug, Insertable)]
#[diesel(table_name = backfill_processor_status)]
/// Only tracking the latest version successfully processed
pub struct BackfillProcessorStatus {
pub backfill_alias: String,
pub backfill_status: BackfillStatus,
pub last_success_version: i64,
pub last_transaction_timestamp: Option<chrono::NaiveDateTime>,
pub backfill_start_version: i64,
Expand All @@ -23,6 +61,7 @@ pub struct BackfillProcessorStatus {
/// Only tracking the latest version successfully processed
pub struct BackfillProcessorStatusQuery {
pub backfill_alias: String,
pub backfill_status: BackfillStatus,
pub last_success_version: i64,
pub last_updated: chrono::NaiveDateTime,
pub last_transaction_timestamp: Option<chrono::NaiveDateTime>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
-- Your SQL goes here
CREATE TABLE backfill_processor_status (
backfill_alias VARCHAR(50) NOT NULL,
backfill_status VARCHAR(50) NOT NULL,
last_success_version BIGINT NOT NULL,
last_updated TIMESTAMP NOT NULL DEFAULT NOW(),
last_transaction_timestamp TIMESTAMP NULL,
Expand Down
2 changes: 2 additions & 0 deletions aptos-indexer-processor-example/src/db/postgres/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ diesel::table! {
backfill_processor_status (backfill_alias) {
#[max_length = 50]
backfill_alias -> Varchar,
#[max_length = 50]
backfill_status -> Varchar,
last_success_version -> Int8,
last_updated -> Timestamp,
last_transaction_timestamp -> Nullable<Timestamp>,
Expand Down
1 change: 0 additions & 1 deletion aptos-indexer-processor-example/src/utils/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ pub async fn run_migrations(postgres_connection_string: String, _conn_pool: ArcD
#[cfg(not(feature = "libpq"))]
pub async fn run_migrations(postgres_connection_string: String, conn_pool: ArcDbPool) {
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;

info!("Running migrations: {:?}", postgres_connection_string);
let conn = conn_pool
// We need to use this since AsyncConnectionWrapper doesn't know how to
Expand Down
Loading

0 comments on commit 6038b1c

Please sign in to comment.