From 31ddfa3b0993f0ca2576eea39613ff44be2f0bd0 Mon Sep 17 00:00:00 2001 From: dermanyang Date: Tue, 15 Oct 2024 13:17:13 -0700 Subject: [PATCH 01/10] Add backfill status, improve starting ver selection. --- .../src/common/processor_status_saver.rs | 21 ++++- .../models/backfill_processor_status.rs | 39 +++++++++ .../up.sql | 1 + .../src/db/postgres/schema.rs | 2 + .../src/utils/starting_version.rs | 83 +++++++++++-------- 5 files changed, 109 insertions(+), 37 deletions(-) diff --git a/aptos-indexer-processor-example/src/common/processor_status_saver.rs b/aptos-indexer-processor-example/src/common/processor_status_saver.rs index 89d4b89..871eb03 100644 --- a/aptos-indexer-processor-example/src/common/processor_status_saver.rs +++ b/aptos-indexer-processor-example/src/common/processor_status_saver.rs @@ -1,9 +1,13 @@ use crate::{ config::indexer_processor_config::IndexerProcessorConfig, db::common::models::{ - backfill_processor_status::BackfillProcessorStatus, processor_status::ProcessorStatus, + backfill_processor_status::{BackfillProcessorStatus, BackfillStatus}, + processor_status::ProcessorStatus, + }, + schema::{ + backfill_processor_status::{self, last_success_version}, + processor_status, }, - schema::{backfill_processor_status, processor_status}, utils::database::{execute_with_better_error, ArcDbPool}, }; use anyhow::Result; @@ -101,11 +105,21 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { backfill_start_version, backfill_end_version, } => { + let lst_success_version = last_success_batch.metadata.end_version as i64; + let backfill_status = if backfill_end_version.is_some_and(|backfill_end_version| { + lst_success_version >= backfill_end_version as i64 + }) { + BackfillStatus::Complete + } else { + BackfillStatus::InProgress + }; let status = BackfillProcessorStatus { backfill_alias: backfill_alias.clone(), - last_success_version: last_success_batch.metadata.end_version as i64, + backfill_status: backfill_status, + last_success_version: lst_success_version, last_transaction_timestamp: end_timestamp, backfill_start_version: backfill_start_version.unwrap_or(0) as i64, + backfill_end_version: backfill_end_version .unwrap_or(last_success_batch.metadata.end_version) as i64, @@ -117,6 +131,7 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { .on_conflict(backfill_processor_status::backfill_alias) .do_update() .set(( + backfill_processor_status::backfill_status.eq(excluded(backfill_processor_status::backfill_status)), backfill_processor_status::last_success_version.eq(excluded(backfill_processor_status::last_success_version)), backfill_processor_status::last_updated.eq(excluded(backfill_processor_status::last_updated)), backfill_processor_status::last_transaction_timestamp.eq(excluded(backfill_processor_status::last_transaction_timestamp)), diff --git a/aptos-indexer-processor-example/src/db/common/models/backfill_processor_status.rs b/aptos-indexer-processor-example/src/db/common/models/backfill_processor_status.rs index 55e2086..01ff6fb 100644 --- a/aptos-indexer-processor-example/src/db/common/models/backfill_processor_status.rs +++ b/aptos-indexer-processor-example/src/db/common/models/backfill_processor_status.rs @@ -4,14 +4,52 @@ #![allow(clippy::extra_unused_lifetimes)] use crate::{schema::backfill_processor_status, utils::database::DbPoolConnection}; +use diesel::deserialize; +use diesel::deserialize::{FromSql, FromSqlRow}; +use diesel::expression::AsExpression; +use diesel::pg::{Pg, PgValue}; +use diesel::serialize; +use diesel::serialize::{IsNull, Output, ToSql}; +use diesel::sql_types::Text; use diesel::{AsChangeset, ExpressionMethods, Insertable, OptionalExtension, QueryDsl, Queryable}; use diesel_async::RunQueryDsl; +use std::io::Write; + +#[derive(Debug, PartialEq, FromSqlRow, AsExpression, Eq)] +#[diesel(sql_type = Text)] +pub enum BackfillStatus { + // #[diesel(rename = "in_progress")] + InProgress, + // #[diesel(rename = "complete")] + Complete, +} + +impl ToSql for BackfillStatus { + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> serialize::Result { + match *self { + BackfillStatus::InProgress => out.write_all(b"in_progress")?, + BackfillStatus::Complete => out.write_all(b"complete")?, + } + Ok(IsNull::No) + } +} + +impl FromSql for BackfillStatus { + fn from_sql(bytes: PgValue<'_>) -> deserialize::Result { + match bytes.as_bytes() { + b"in_progress" => Ok(BackfillStatus::InProgress), + b"complete" => Ok(BackfillStatus::Complete), + _ => Err("Unrecognized enum variant".into()), + } + } +} #[derive(AsChangeset, Debug, Insertable)] #[diesel(table_name = backfill_processor_status)] /// Only tracking the latest version successfully processed pub struct BackfillProcessorStatus { pub backfill_alias: String, + pub backfill_status: BackfillStatus, pub last_success_version: i64, pub last_transaction_timestamp: Option, pub backfill_start_version: i64, @@ -23,6 +61,7 @@ pub struct BackfillProcessorStatus { /// Only tracking the latest version successfully processed pub struct BackfillProcessorStatusQuery { pub backfill_alias: String, + pub backfill_status: BackfillStatus, pub last_success_version: i64, pub last_updated: chrono::NaiveDateTime, pub last_transaction_timestamp: Option, diff --git a/aptos-indexer-processor-example/src/db/postgres/migrations/2024-10-02-193334_backfill_processor_status/up.sql b/aptos-indexer-processor-example/src/db/postgres/migrations/2024-10-02-193334_backfill_processor_status/up.sql index aa08b75..7bcb084 100644 --- a/aptos-indexer-processor-example/src/db/postgres/migrations/2024-10-02-193334_backfill_processor_status/up.sql +++ b/aptos-indexer-processor-example/src/db/postgres/migrations/2024-10-02-193334_backfill_processor_status/up.sql @@ -1,6 +1,7 @@ -- Your SQL goes here CREATE TABLE backfill_processor_status ( backfill_alias VARCHAR(50) NOT NULL, + backfill_status VARCHAR(50) NOT NULL, last_success_version BIGINT NOT NULL, last_updated TIMESTAMP NOT NULL DEFAULT NOW(), last_transaction_timestamp TIMESTAMP NULL, diff --git a/aptos-indexer-processor-example/src/db/postgres/schema.rs b/aptos-indexer-processor-example/src/db/postgres/schema.rs index c500a27..6873a57 100644 --- a/aptos-indexer-processor-example/src/db/postgres/schema.rs +++ b/aptos-indexer-processor-example/src/db/postgres/schema.rs @@ -4,6 +4,8 @@ diesel::table! { backfill_processor_status (backfill_alias) { #[max_length = 50] backfill_alias -> Varchar, + #[max_length = 50] + backfill_status -> Varchar, last_success_version -> Int8, last_updated -> Timestamp, last_transaction_timestamp -> Nullable, diff --git a/aptos-indexer-processor-example/src/utils/starting_version.rs b/aptos-indexer-processor-example/src/utils/starting_version.rs index 9a9f56e..38f26d6 100644 --- a/aptos-indexer-processor-example/src/utils/starting_version.rs +++ b/aptos-indexer-processor-example/src/utils/starting_version.rs @@ -1,68 +1,83 @@ +use std::cmp::max; + use super::database::ArcDbPool; use crate::{ config::indexer_processor_config::IndexerProcessorConfig, db::common::models::{ - backfill_processor_status::BackfillProcessorStatusQuery, + backfill_processor_status::{BackfillProcessorStatusQuery, BackfillStatus}, processor_status::ProcessorStatusQuery, }, }; use anyhow::{Context, Result}; +/// Get the appropriate starting version for the processor. +/// +/// If it is a regular processor, this will return the higher of the checkpointed version, +/// or `staring_version` from the config, or 0 if not set. +/// +/// If this is a backfill processor and threre is an in-progress backfill, this will return +/// the checkpointed version + 1. +/// +/// If this is a backfill processor and there is not an in-progress backfill (i.e., no checkpoint or +/// backfill status is COMPLETE), this will return `starting_version` from the config, or 0 if not set. pub async fn get_starting_version( indexer_processor_config: &IndexerProcessorConfig, conn_pool: ArcDbPool, ) -> Result { - // If starting_version is set in TransactionStreamConfig, use that - if indexer_processor_config - .transaction_stream_config - .starting_version - .is_some() - { - return Ok(indexer_processor_config - .transaction_stream_config - .starting_version - .unwrap()); - } - - // If it's not set, check if the DB has latest_processed_version set and use that - let latest_processed_version_from_db = + // Check if there's a checkpoint in the approrpiate processor status table. + let latest_processed_version = get_latest_processed_version_from_db(indexer_processor_config, conn_pool) .await .context("Failed to get latest processed version from DB")?; - if let Some(latest_processed_version_tracker) = latest_processed_version_from_db { - return Ok(latest_processed_version_tracker + 1); - } - // If latest_processed_version is not stored in DB, return the default 0 - Ok(0) + // If nothing checkpointed, return the `starting_version` from the config, or 0 if not set. + Ok(latest_processed_version.unwrap_or( + indexer_processor_config + .transaction_stream_config + .starting_version + .unwrap_or(0), + )) } -/// Gets the start version for the processor. If not found, start from 0. -pub async fn get_latest_processed_version_from_db( +async fn get_latest_processed_version_from_db( indexer_processor_config: &IndexerProcessorConfig, conn_pool: ArcDbPool, ) -> Result> { let mut conn = conn_pool.get().await?; if let Some(backfill_config) = &indexer_processor_config.backfill_config { - return match BackfillProcessorStatusQuery::get_by_processor( + let backfill_status = BackfillProcessorStatusQuery::get_by_processor( &backfill_config.backfill_alias, &mut conn, ) - .await? - { - Some(status) => Ok(Some(status.last_success_version as u64)), - None => Ok(None), - }; + .await?; + + // Return None if there is no checkpoint or if the backfill is old (complete). + // Otherwise, return the checkpointed version + 1. + return Ok( + backfill_status.and_then(|status| match status.backfill_status { + BackfillStatus::InProgress => Some(status.last_success_version as u64 + 1), + // If status is Complete, this is the start of a new backfill job. + BackfillStatus::Complete => None, + }), + ); } - match ProcessorStatusQuery::get_by_processor( + let status = ProcessorStatusQuery::get_by_processor( indexer_processor_config.processor_config.name(), &mut conn, ) - .await? - { - Some(status) => Ok(Some(status.last_success_version as u64)), - None => Ok(None), - } + .await?; + + // Return None if there is no checkpoint. Otherwise, + // return the higher of the checkpointed version + 1 the `starting_version`. + Ok(status.map(|status| { + std::cmp::max( + status.last_success_version as u64 + 1, + indexer_processor_config + .transaction_stream_config + .starting_version + .unwrap_or(0), + ) + })) } From 95e34e5da2e63ee59bc43ad4c07d1820b2dec4ff Mon Sep 17 00:00:00 2001 From: dermanyang Date: Wed, 16 Oct 2024 10:25:04 -0700 Subject: [PATCH 02/10] start writing unit tests --- aptos-indexer-processor-example/Cargo.lock | 20 +++++----- aptos-indexer-processor-example/Cargo.toml | 2 +- aptos-indexer-processor-example/config.yaml | 8 ++-- .../src/utils/starting_version.rs | 39 +++++++++++++++++++ 4 files changed, 54 insertions(+), 15 deletions(-) diff --git a/aptos-indexer-processor-example/Cargo.lock b/aptos-indexer-processor-example/Cargo.lock index 6c99a6e..723ebcb 100644 --- a/aptos-indexer-processor-example/Cargo.lock +++ b/aptos-indexer-processor-example/Cargo.lock @@ -122,7 +122,7 @@ version = "0.1.0" dependencies = [ "ahash", "anyhow", - "aptos-indexer-processor-sdk 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=89207266ac6ba3370f8ed72918f0f95f141814dc)", + "aptos-indexer-processor-sdk 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=366b93b5a1534efa24de1a8dac22e2fbe54fbd00)", "aptos-indexer-processor-sdk-server-framework", "async-trait", "chrono", @@ -149,11 +149,11 @@ dependencies = [ [[package]] name = "aptos-indexer-processor-sdk" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=89207266ac6ba3370f8ed72918f0f95f141814dc#89207266ac6ba3370f8ed72918f0f95f141814dc" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=366b93b5a1534efa24de1a8dac22e2fbe54fbd00#366b93b5a1534efa24de1a8dac22e2fbe54fbd00" dependencies = [ "ahash", "anyhow", - "aptos-indexer-transaction-stream 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=89207266ac6ba3370f8ed72918f0f95f141814dc)", + "aptos-indexer-transaction-stream 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=366b93b5a1534efa24de1a8dac22e2fbe54fbd00)", "aptos-protos", "async-trait", "bcs", @@ -162,7 +162,7 @@ dependencies = [ "derive_builder", "futures", "hex", - "instrumented-channel 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=89207266ac6ba3370f8ed72918f0f95f141814dc)", + "instrumented-channel 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=366b93b5a1534efa24de1a8dac22e2fbe54fbd00)", "kanal", "mockall", "num_cpus", @@ -238,17 +238,17 @@ dependencies = [ [[package]] name = "aptos-indexer-transaction-stream" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=89207266ac6ba3370f8ed72918f0f95f141814dc#89207266ac6ba3370f8ed72918f0f95f141814dc" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=366b93b5a1534efa24de1a8dac22e2fbe54fbd00#366b93b5a1534efa24de1a8dac22e2fbe54fbd00" dependencies = [ "anyhow", - "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=89207266ac6ba3370f8ed72918f0f95f141814dc)", + "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=366b93b5a1534efa24de1a8dac22e2fbe54fbd00)", "aptos-protos", "chrono", "futures-util", "once_cell", "prometheus", "prost", - "sample 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=89207266ac6ba3370f8ed72918f0f95f141814dc)", + "sample 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=366b93b5a1534efa24de1a8dac22e2fbe54fbd00)", "serde", "tokio", "tonic", @@ -280,7 +280,7 @@ dependencies = [ [[package]] name = "aptos-moving-average" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=89207266ac6ba3370f8ed72918f0f95f141814dc#89207266ac6ba3370f8ed72918f0f95f141814dc" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=366b93b5a1534efa24de1a8dac22e2fbe54fbd00#366b93b5a1534efa24de1a8dac22e2fbe54fbd00" dependencies = [ "chrono", ] @@ -1572,7 +1572,7 @@ dependencies = [ [[package]] name = "instrumented-channel" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=89207266ac6ba3370f8ed72918f0f95f141814dc#89207266ac6ba3370f8ed72918f0f95f141814dc" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=366b93b5a1534efa24de1a8dac22e2fbe54fbd00#366b93b5a1534efa24de1a8dac22e2fbe54fbd00" dependencies = [ "delegate", "derive_builder", @@ -2723,7 +2723,7 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "sample" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=89207266ac6ba3370f8ed72918f0f95f141814dc#89207266ac6ba3370f8ed72918f0f95f141814dc" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=366b93b5a1534efa24de1a8dac22e2fbe54fbd00#366b93b5a1534efa24de1a8dac22e2fbe54fbd00" dependencies = [ "tracing", ] diff --git a/aptos-indexer-processor-example/Cargo.toml b/aptos-indexer-processor-example/Cargo.toml index 16d3212..8dec19e 100644 --- a/aptos-indexer-processor-example/Cargo.toml +++ b/aptos-indexer-processor-example/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] ahash = { version = "0.8.7", features = ["serde"] } anyhow = "1.0.86" -aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "89207266ac6ba3370f8ed72918f0f95f141814dc" } +aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "366b93b5a1534efa24de1a8dac22e2fbe54fbd00" } aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "ca658a0881c4bff4e1dee14c59a7c63608a5b315" } async-trait = "0.1.80" chrono = { version = "0.4.19", features = ["clock", "serde"] } diff --git a/aptos-indexer-processor-example/config.yaml b/aptos-indexer-processor-example/config.yaml index 24d3ca6..99e1474 100644 --- a/aptos-indexer-processor-example/config.yaml +++ b/aptos-indexer-processor-example/config.yaml @@ -3,13 +3,13 @@ health_check_port: 8085 server_config: processor_config: type: "events_processor" - # backfill_config: - # backfill_alias: "events_processor_backfill_1" + backfill_config: + backfill_alias: "events_processor_backfill_1" transaction_stream_config: indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443" starting_version: 0 - # request_ending_version: 1 - auth_token: "AUTH_TOKEN" + request_ending_version: 100000 + auth_token: "aptoslabs_No5KWpMQKvz_7ohYQejzxmgpL27BS9gK784GpA14EP9mb" request_name_header: "events-processor" db_config: postgres_connection_string: postgresql://postgres:@localhost:5432/example diff --git a/aptos-indexer-processor-example/src/utils/starting_version.rs b/aptos-indexer-processor-example/src/utils/starting_version.rs index 38f26d6..1048c2e 100644 --- a/aptos-indexer-processor-example/src/utils/starting_version.rs +++ b/aptos-indexer-processor-example/src/utils/starting_version.rs @@ -81,3 +81,42 @@ async fn get_latest_processed_version_from_db( ) })) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + config::indexer_processor_config::{BackfillConfig, DbConfig, IndexerProcessorConfig}, + db::common::models::{ + backfill_processor_status::{BackfillProcessorStatus, BackfillStatus}, + processor_status::ProcessorStatus, + }, + }; + use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig; + use chrono::Utc; + + #[tokio::test] + async fn test_get_starting_version_no_checkpoint() { + let indexer_processor_config = IndexerProcessorConfig { + db_config: DbConfig { + postgres_connection_string: "test".to_string(), + db_pool_size: 1, + }, + transaction_stream_config: TransactionStreamConfig { + indexer_grpc_data_service_address: "test_url".parse().unwrap(), + starting_version: None, + request_ending_version: None, + auth_token: "test".to_string(), + request_name_header: "test".to_string(), + }, + processor_config: Default::default(), + backfill_config: None, + }; + let conn_pool = ArcDbPool::new(Default::default()); + + let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + .await + .unwrap(); + assert_eq!(starting_version, 0); + } +} From 7fd10c5b0cbb994a80ca869cdd1f65b64d91d1d7 Mon Sep 17 00:00:00 2001 From: dermanyang Date: Wed, 16 Oct 2024 14:35:04 -0700 Subject: [PATCH 03/10] Add default processor tests --- aptos-indexer-processor-example/Cargo.lock | 905 +++++++++++++++++- aptos-indexer-processor-example/Cargo.toml | 1 + .../src/utils/starting_version.rs | 110 ++- 3 files changed, 981 insertions(+), 35 deletions(-) diff --git a/aptos-indexer-processor-example/Cargo.lock b/aptos-indexer-processor-example/Cargo.lock index 723ebcb..2d98d56 100644 --- a/aptos-indexer-processor-example/Cargo.lock +++ b/aptos-indexer-processor-example/Cargo.lock @@ -124,6 +124,7 @@ dependencies = [ "anyhow", "aptos-indexer-processor-sdk 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=366b93b5a1534efa24de1a8dac22e2fbe54fbd00)", "aptos-indexer-processor-sdk-server-framework", + "aptos-indexer-testing-framework", "async-trait", "chrono", "clap", @@ -235,6 +236,32 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "aptos-indexer-testing-framework" +version = "0.1.0" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=366b93b5a1534efa24de1a8dac22e2fbe54fbd00#366b93b5a1534efa24de1a8dac22e2fbe54fbd00" +dependencies = [ + "anyhow", + "aptos-indexer-processor-sdk 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=366b93b5a1534efa24de1a8dac22e2fbe54fbd00)", + "aptos-protos", + "async-trait", + "diesel", + "diesel-async", + "diesel_migrations", + "futures", + "futures-util", + "native-tls", + "postgres-native-tls", + "serde_json", + "testcontainers", + "tokio", + "tokio-postgres", + "tokio-retry", + "tokio-stream", + "tonic", + "url", +] + [[package]] name = "aptos-indexer-transaction-stream" version = "0.1.0" @@ -386,6 +413,12 @@ dependencies = [ "syn 2.0.74", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.3.0" @@ -623,6 +656,56 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bollard" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0aed08d3adb6ebe0eff737115056652670ae290f177759aac19c30456135f94c" +dependencies = [ + "base64 0.22.1", + "bollard-stubs", + "bytes", + "futures-core", + "futures-util", + "hex", + "home", + "http 1.1.0", + "http-body-util", + "hyper 1.4.1", + "hyper-named-pipe", + "hyper-rustls 0.26.0", + "hyper-util", + "hyperlocal-next", + "log", + "pin-project-lite", + "rustls 0.22.4", + "rustls-native-certs 0.7.1", + "rustls-pemfile", + "rustls-pki-types", + "serde", + "serde_derive", + "serde_json", + "serde_repr", + "serde_urlencoded", + "thiserror", + "tokio", + "tokio-util", + "tower-service", + "url", + "winapi", +] + +[[package]] +name = "bollard-stubs" +version = "1.44.0-rc.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "709d9aa1c37abb89d40f19f5d0ad6f0d88cb1581264e571c9350fc5bb89cf1c5" +dependencies = [ + "serde", + "serde_repr", + "serde_with", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -682,7 +765,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -850,6 +933,12 @@ dependencies = [ "syn 2.0.74", ] +[[package]] +name = "data-encoding" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" + [[package]] name = "debugid" version = "0.8.0" @@ -870,6 +959,16 @@ dependencies = [ "syn 2.0.74", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", + "serde", +] + [[package]] name = "derive_builder" version = "0.20.0" @@ -916,6 +1015,7 @@ dependencies = [ "num-bigint", "num-integer", "num-traits", + "pq-sys", "serde_json", ] @@ -976,6 +1076,38 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.48.0", +] + +[[package]] +name = "docker_credential" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31951f49556e34d90ed28342e1df7e1cb7a229c4cab0aecc627b5d91edd41d07" +dependencies = [ + "base64 0.21.7", + "serde", + "serde_json", +] + [[package]] name = "downcast" version = "0.11.0" @@ -1015,6 +1147,27 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "encoding_rs" +version = "0.8.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "enum-as-inner" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.74", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -1300,6 +1453,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap 2.4.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1345,6 +1517,51 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hickory-proto" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07698b8420e2f0d6447a436ba999ec85d8fbf2a398bbd737b82cac4a2e96e512" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna 0.4.0", + "ipnet", + "once_cell", + "rand", + "thiserror", + "tinyvec", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "hickory-resolver" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28757f23aa75c98f254cf0405e6d8c25b831b32921b050a66692427679b1f243" +dependencies = [ + "cfg-if", + "futures-util", + "hickory-proto", + "ipconfig", + "lru-cache", + "once_cell", + "parking_lot", + "rand", + "resolv-conf", + "smallvec", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "hmac" version = "0.12.1" @@ -1354,6 +1571,26 @@ dependencies = [ "digest", ] +[[package]] +name = "home" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "hostname" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" +dependencies = [ + "libc", + "match_cfg", + "winapi", +] + [[package]] name = "http" version = "0.2.12" @@ -1432,7 +1669,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -1455,6 +1692,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", + "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", "httparse", @@ -1463,6 +1701,60 @@ dependencies = [ "pin-project-lite", "smallvec", "tokio", + "want", +] + +[[package]] +name = "hyper-named-pipe" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278" +dependencies = [ + "hex", + "hyper 1.4.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", + "winapi", +] + +[[package]] +name = "hyper-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper 1.4.1", + "hyper-util", + "log", + "rustls 0.22.4", + "rustls-native-certs 0.7.1", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.25.0", + "tower-service", +] + +[[package]] +name = "hyper-rustls" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper 1.4.1", + "hyper-util", + "rustls 0.23.12", + "rustls-native-certs 0.8.0", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.0", + "tower-service", + "webpki-roots", ] [[package]] @@ -1484,12 +1776,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" dependencies = [ "bytes", + "futures-channel", "futures-util", "http 1.1.0", "http-body 1.0.1", "hyper 1.4.1", "pin-project-lite", + "socket2", + "tokio", + "tower", + "tower-service", + "tracing", +] + +[[package]] +name = "hyperlocal-next" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acf569d43fa9848e510358c07b80f4adf34084ddc28c6a4a651ee8474c070dcc" +dependencies = [ + "hex", + "http-body-util", + "hyper 1.4.1", + "hyper-util", + "pin-project-lite", "tokio", + "tower-service", ] [[package]] @@ -1521,6 +1833,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" +[[package]] +name = "idna" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "idna" version = "0.5.0" @@ -1539,6 +1861,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -1549,6 +1872,7 @@ checksum = "93ead53efc7ea8ed3cfb0c79fc8023fbb782a5432b52830b6518941cebe6505c" dependencies = [ "equivalent", "hashbrown 0.14.5", + "serde", ] [[package]] @@ -1595,6 +1919,24 @@ dependencies = [ "prometheus-client", ] +[[package]] +name = "ipconfig" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" +dependencies = [ + "socket2", + "widestring", + "windows-sys 0.48.0", + "winreg", +] + +[[package]] +name = "ipnet" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" + [[package]] name = "is-terminal" version = "0.4.12" @@ -1694,12 +2036,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" [[package]] -name = "linked-hash-map" -version = "0.5.6" +name = "libredox" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" - -[[package]] +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags 2.6.0", + "libc", +] + +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + +[[package]] name = "linkme" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1741,6 +2093,15 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "lru-cache" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "mach2" version = "0.4.2" @@ -1750,6 +2111,12 @@ dependencies = [ "libc", ] +[[package]] +name = "match_cfg" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" + [[package]] name = "matchers" version = "0.1.0" @@ -1964,6 +2331,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-format" version = "0.4.4" @@ -2107,6 +2480,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "ordered-float" version = "4.2.2" @@ -2142,7 +2521,32 @@ dependencies = [ "libc", "redox_syscall 0.5.3", "smallvec", - "windows-targets", + "windows-targets 0.52.6", +] + +[[package]] +name = "parse-display" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "914a1c2265c98e2446911282c6ac86d8524f495792c38c5bd884f80499c7538a" +dependencies = [ + "parse-display-derive", + "regex", + "regex-syntax 0.8.4", +] + +[[package]] +name = "parse-display-derive" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ae7800a4c974efd12df917266338e79a7a74415173caf7e70aa0a0707345281" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "regex-syntax 0.8.4", + "structmeta", + "syn 2.0.74", ] [[package]] @@ -2275,6 +2679,12 @@ dependencies = [ "postgres-protocol", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "pprof" version = "0.11.1" @@ -2307,6 +2717,15 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "pq-sys" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd" +dependencies = [ + "vcpkg", +] + [[package]] name = "predicates" version = "3.1.2" @@ -2444,6 +2863,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + [[package]] name = "quick-xml" version = "0.26.0" @@ -2453,6 +2878,54 @@ dependencies = [ "memchr", ] +[[package]] +name = "quinn" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c7c5fdde3cdae7203427dc4f0a68fe0ed09833edc525a03456b153b79828684" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls 0.23.12", + "socket2", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" +dependencies = [ + "bytes", + "rand", + "ring", + "rustc-hash", + "rustls 0.23.12", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + +[[package]] +name = "quinn-udp" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bffec3605b73c6f1754535084a85229fa8a30f86014e6c81aeec4abb68b0285" +dependencies = [ + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.36" @@ -2539,6 +3012,17 @@ dependencies = [ "bitflags 2.6.0", ] +[[package]] +name = "redox_users" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" +dependencies = [ + "getrandom", + "libredox", + "thiserror", +] + [[package]] name = "regex" version = "1.10.6" @@ -2583,6 +3067,62 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "reqwest" +version = "0.12.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f713147fbe92361e52392c73b8c9e48c04c6625bce969ef54dc901e58e042a7b" +dependencies = [ + "base64 0.22.1", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.4.6", + "hickory-resolver", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-rustls 0.27.3", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls 0.23.12", + "rustls-native-certs 0.8.0", + "rustls-pemfile", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "tokio", + "tokio-rustls 0.26.0", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots", + "windows-registry", +] + +[[package]] +name = "resolv-conf" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52e44394d2086d010551b14b53b1f24e31647570cd1deb0379e2c21b329aba00" +dependencies = [ + "hostname", + "quick-error", +] + [[package]] name = "rgb" version = "0.8.48" @@ -2641,6 +3181,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" + [[package]] name = "rustix" version = "0.38.34" @@ -2668,6 +3214,20 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls" +version = "0.23.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" +dependencies = [ + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.7.1" @@ -2681,6 +3241,19 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "2.1.3" @@ -2827,6 +3400,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_repr" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.74", +] + [[package]] name = "serde_spanned" version = "0.6.7" @@ -2848,6 +3432,36 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e28bdad6db2b8340e449f7108f020b3b092e8583a9e3fb82713e1d4e71fe817" +dependencies = [ + "base64 0.22.1", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.4.0", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d846214a9854ef724f3da161b426242d8de7c1fc7de2f89bb1efcb154dca79d" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.74", +] + [[package]] name = "serde_yaml" version = "0.8.26" @@ -2978,6 +3592,29 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "structmeta" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e1575d8d40908d70f6fd05537266b90ae71b15dbbe7a8b7dffa2b759306d329" +dependencies = [ + "proc-macro2", + "quote", + "structmeta-derive", + "syn 2.0.74", +] + +[[package]] +name = "structmeta-derive" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.74", +] + [[package]] name = "strum" version = "0.24.1" @@ -3062,6 +3699,9 @@ name = "sync_wrapper" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +dependencies = [ + "futures-core", +] [[package]] name = "tempfile" @@ -3082,6 +3722,35 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" +[[package]] +name = "testcontainers" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "725cbe485aafddfd8b2d01665937c95498d894c07fabd9c4e06a53c7da4ccc56" +dependencies = [ + "async-trait", + "bollard", + "bollard-stubs", + "bytes", + "dirs", + "docker_credential", + "either", + "futures", + "log", + "memchr", + "parse-display", + "pin-project-lite", + "reqwest", + "serde", + "serde_json", + "serde_with", + "thiserror", + "tokio", + "tokio-stream", + "tokio-util", + "url", +] + [[package]] name = "thiserror" version = "1.0.63" @@ -3112,6 +3781,37 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tiny-keccak" version = "2.0.2" @@ -3211,22 +3911,44 @@ dependencies = [ "whoami", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ - "rustls", + "rustls 0.22.4", + "rustls-pki-types", + "tokio", +] + +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls 0.23.12", "rustls-pki-types", "tokio", ] [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" dependencies = [ "futures-core", "pin-project-lite", @@ -3292,7 +4014,7 @@ dependencies = [ "base64 0.21.7", "bytes", "flate2", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.30", @@ -3300,11 +4022,11 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "rustls-native-certs", + "rustls-native-certs 0.7.1", "rustls-pemfile", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.25.0", "tokio-stream", "tower", "tower-layer", @@ -3472,7 +4194,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" dependencies = [ "form_urlencoded", - "idna", + "idna 0.5.0", "percent-encoding", "serde", ] @@ -3560,6 +4282,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61e9300f63a621e96ed275155c108eb6f843b6a26d053f122ab69724559dc8ed" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.93" @@ -3599,6 +4333,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.26.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841c67bff177718f1d4dfefde8d8f0e78f9b6589319ba88312f567fc5841a958" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "whoami" version = "1.5.1" @@ -3610,6 +4353,12 @@ dependencies = [ "web-sys", ] +[[package]] +name = "widestring" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7219d36b6eac893fa81e84ebe06485e7dcbb616177469b142df14f1f4deb1311" + [[package]] name = "winapi" version = "0.3.9" @@ -3638,7 +4387,46 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-registry" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" +dependencies = [ + "windows-result", + "windows-strings", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", ] [[package]] @@ -3647,7 +4435,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -3656,7 +4444,22 @@ version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", ] [[package]] @@ -3665,28 +4468,46 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -3699,24 +4520,48 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -3732,6 +4577,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "yaml-rust" version = "0.4.5" diff --git a/aptos-indexer-processor-example/Cargo.toml b/aptos-indexer-processor-example/Cargo.toml index 8dec19e..a91fb14 100644 --- a/aptos-indexer-processor-example/Cargo.toml +++ b/aptos-indexer-processor-example/Cargo.toml @@ -9,6 +9,7 @@ ahash = { version = "0.8.7", features = ["serde"] } anyhow = "1.0.86" aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "366b93b5a1534efa24de1a8dac22e2fbe54fbd00" } aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "ca658a0881c4bff4e1dee14c59a7c63608a5b315" } +aptos-indexer-testing-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "366b93b5a1534efa24de1a8dac22e2fbe54fbd00" } async-trait = "0.1.80" chrono = { version = "0.4.19", features = ["clock", "serde"] } clap = { version = "4.3.5", features = ["derive", "unstable-styles"] } diff --git a/aptos-indexer-processor-example/src/utils/starting_version.rs b/aptos-indexer-processor-example/src/utils/starting_version.rs index 1048c2e..cef7ded 100644 --- a/aptos-indexer-processor-example/src/utils/starting_version.rs +++ b/aptos-indexer-processor-example/src/utils/starting_version.rs @@ -50,7 +50,8 @@ async fn get_latest_processed_version_from_db( &backfill_config.backfill_alias, &mut conn, ) - .await?; + .await + .context("Failed to query backfill_processor_status table.")?; // Return None if there is no checkpoint or if the backfill is old (complete). // Otherwise, return the checkpointed version + 1. @@ -67,7 +68,8 @@ async fn get_latest_processed_version_from_db( indexer_processor_config.processor_config.name(), &mut conn, ) - .await?; + .await + .context("Failed to query backfill_processor_status table.")?; // Return None if there is no checkpoint. Otherwise, // return the higher of the checkpointed version + 1 the `starting_version`. @@ -86,37 +88,125 @@ async fn get_latest_processed_version_from_db( mod tests { use super::*; use crate::{ - config::indexer_processor_config::{BackfillConfig, DbConfig, IndexerProcessorConfig}, + config::{ + indexer_processor_config::{DbConfig, IndexerProcessorConfig}, + processor_config::ProcessorConfig, + }, db::common::models::{ - backfill_processor_status::{BackfillProcessorStatus, BackfillStatus}, - processor_status::ProcessorStatus, + backfill_processor_status::BackfillProcessorStatus, processor_status::ProcessorStatus, }, + schema::{backfill_processor_status, processor_status}, + utils::database::{execute_with_better_error, new_db_pool, run_migrations}, }; use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig; - use chrono::Utc; + use aptos_indexer_testing_framework::database::{PostgresTestDatabase, TestDatabase}; + use diesel_async::RunQueryDsl; + use url::Url; #[tokio::test] async fn test_get_starting_version_no_checkpoint() { + let mut db = PostgresTestDatabase::new(); + db.setup().await.unwrap(); let indexer_processor_config = IndexerProcessorConfig { db_config: DbConfig { - postgres_connection_string: "test".to_string(), + postgres_connection_string: db.get_db_url(), db_pool_size: 1, }, transaction_stream_config: TransactionStreamConfig { - indexer_grpc_data_service_address: "test_url".parse().unwrap(), + indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), starting_version: None, request_ending_version: None, auth_token: "test".to_string(), request_name_header: "test".to_string(), + indexer_grpc_http2_ping_interval_secs: 1, + indexer_grpc_http2_ping_timeout_secs: 1, + indexer_grpc_reconnection_timeout_secs: 1, + indexer_grpc_response_item_timeout_secs: 1, }, - processor_config: Default::default(), + processor_config: ProcessorConfig::EventsProcessor, backfill_config: None, }; - let conn_pool = ArcDbPool::new(Default::default()); + let conn_pool = new_db_pool( + indexer_processor_config + .db_config + .postgres_connection_string + .as_str(), + Some(indexer_processor_config.db_config.db_pool_size), + ) + .await + .expect("Failed to create connection pool"); + run_migrations( + indexer_processor_config + .db_config + .postgres_connection_string + .clone(), + conn_pool.clone(), + ) + .await; let starting_version = get_starting_version(&indexer_processor_config, conn_pool) .await .unwrap(); assert_eq!(starting_version, 0); } + + #[tokio::test] + async fn test_get_starting_version_with_checkpoint() { + let mut db = PostgresTestDatabase::new(); + db.setup().await.unwrap(); + let indexer_processor_config = IndexerProcessorConfig { + db_config: DbConfig { + postgres_connection_string: db.get_db_url(), + db_pool_size: 1, + }, + transaction_stream_config: TransactionStreamConfig { + indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), + starting_version: None, + request_ending_version: None, + auth_token: "test".to_string(), + request_name_header: "test".to_string(), + indexer_grpc_http2_ping_interval_secs: 1, + indexer_grpc_http2_ping_timeout_secs: 1, + indexer_grpc_reconnection_timeout_secs: 1, + indexer_grpc_response_item_timeout_secs: 1, + }, + processor_config: ProcessorConfig::EventsProcessor, + backfill_config: None, + }; + let conn_pool = new_db_pool( + indexer_processor_config + .db_config + .postgres_connection_string + .as_str(), + Some(indexer_processor_config.db_config.db_pool_size), + ) + .await + .expect("Failed to create connection pool"); + run_migrations( + indexer_processor_config + .db_config + .postgres_connection_string + .clone(), + conn_pool.clone(), + ) + .await; + + let binding = conn_pool.clone(); + let mut conn = binding.get().await.unwrap(); + let processor_name = indexer_processor_config.processor_config.name(); + diesel::insert_into(processor_status::table) + .values(ProcessorStatus { + processor: processor_name.to_string(), + last_success_version: 10, + last_transaction_timestamp: None, + }) + .execute(&mut conn) + .await + .expect("Failed to insert processor status"); + + let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + .await + .unwrap(); + assert_eq!(starting_version, 11); + } } From 7972ec1eb167b4af1317a59e3aa852ca38753b21 Mon Sep 17 00:00:00 2001 From: dermanyang Date: Wed, 16 Oct 2024 19:02:22 -0700 Subject: [PATCH 04/10] wen test context work --- .../src/common/processor_status_saver.rs | 2 +- .../src/utils/starting_version.rs | 323 ++++++++++++++---- 2 files changed, 261 insertions(+), 64 deletions(-) diff --git a/aptos-indexer-processor-example/src/common/processor_status_saver.rs b/aptos-indexer-processor-example/src/common/processor_status_saver.rs index 871eb03..d718b8f 100644 --- a/aptos-indexer-processor-example/src/common/processor_status_saver.rs +++ b/aptos-indexer-processor-example/src/common/processor_status_saver.rs @@ -5,7 +5,7 @@ use crate::{ processor_status::ProcessorStatus, }, schema::{ - backfill_processor_status::{self, last_success_version}, + backfill_processor_status::{self}, processor_status, }, utils::database::{execute_with_better_error, ArcDbPool}, diff --git a/aptos-indexer-processor-example/src/utils/starting_version.rs b/aptos-indexer-processor-example/src/utils/starting_version.rs index cef7ded..787017b 100644 --- a/aptos-indexer-processor-example/src/utils/starting_version.rs +++ b/aptos-indexer-processor-example/src/utils/starting_version.rs @@ -1,5 +1,3 @@ -use std::cmp::max; - use super::database::ArcDbPool; use crate::{ config::indexer_processor_config::IndexerProcessorConfig, @@ -69,7 +67,7 @@ async fn get_latest_processed_version_from_db( &mut conn, ) .await - .context("Failed to query backfill_processor_status table.")?; + .context("Failed to query processor_status table.")?; // Return None if there is no checkpoint. Otherwise, // return the higher of the checkpointed version + 1 the `starting_version`. @@ -89,75 +87,94 @@ mod tests { use super::*; use crate::{ config::{ - indexer_processor_config::{DbConfig, IndexerProcessorConfig}, + indexer_processor_config::{BackfillConfig, DbConfig, IndexerProcessorConfig}, processor_config::ProcessorConfig, }, db::common::models::{ backfill_processor_status::BackfillProcessorStatus, processor_status::ProcessorStatus, }, schema::{backfill_processor_status, processor_status}, - utils::database::{execute_with_better_error, new_db_pool, run_migrations}, + utils::database::{new_db_pool, run_migrations}, }; use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig; use aptos_indexer_testing_framework::database::{PostgresTestDatabase, TestDatabase}; use diesel_async::RunQueryDsl; use url::Url; - #[tokio::test] - async fn test_get_starting_version_no_checkpoint() { - let mut db = PostgresTestDatabase::new(); - db.setup().await.unwrap(); - let indexer_processor_config = IndexerProcessorConfig { - db_config: DbConfig { - postgres_connection_string: db.get_db_url(), - db_pool_size: 1, - }, - transaction_stream_config: TransactionStreamConfig { - indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), - starting_version: None, - request_ending_version: None, - auth_token: "test".to_string(), - request_name_header: "test".to_string(), - indexer_grpc_http2_ping_interval_secs: 1, - indexer_grpc_http2_ping_timeout_secs: 1, - indexer_grpc_reconnection_timeout_secs: 1, - indexer_grpc_response_item_timeout_secs: 1, - }, - processor_config: ProcessorConfig::EventsProcessor, - backfill_config: None, - }; - let conn_pool = new_db_pool( - indexer_processor_config - .db_config - .postgres_connection_string - .as_str(), - Some(indexer_processor_config.db_config.db_pool_size), - ) - .await - .expect("Failed to create connection pool"); - run_migrations( - indexer_processor_config - .db_config - .postgres_connection_string - .clone(), - conn_pool.clone(), - ) - .await; + struct TestContext { + conn_pool: ArcDbPool, + indexer_processor_config: IndexerProcessorConfig, + } - let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + impl TestContext { + async fn new(backfill_config: Option) -> Self { + let mut db = PostgresTestDatabase::new(); + db.setup().await.unwrap(); + let indexer_processor_config = IndexerProcessorConfig { + db_config: DbConfig { + postgres_connection_string: db.get_db_url(), + db_pool_size: 2, + }, + transaction_stream_config: TransactionStreamConfig { + indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), + starting_version: None, + request_ending_version: None, + auth_token: "test".to_string(), + request_name_header: "test".to_string(), + indexer_grpc_http2_ping_interval_secs: 1, + indexer_grpc_http2_ping_timeout_secs: 1, + indexer_grpc_reconnection_timeout_secs: 1, + indexer_grpc_response_item_timeout_secs: 1, + }, + processor_config: ProcessorConfig::EventsProcessor, + backfill_config: backfill_config, + }; + + let conn_pool = new_db_pool( + indexer_processor_config + .db_config + .postgres_connection_string + .as_str(), + Some(indexer_processor_config.db_config.db_pool_size), + ) .await - .unwrap(); + .expect("Failed to create connection pool"); + + run_migrations( + indexer_processor_config + .db_config + .postgres_connection_string + .clone(), + conn_pool.clone(), + ) + .await; + + TestContext { + conn_pool, + indexer_processor_config, + } + } + } + + #[tokio::test] + async fn test_with_test_context() { + let test_context = TestContext::new(None).await; + let config = test_context.indexer_processor_config; + let conn_pool = test_context.conn_pool; + + let starting_version = get_starting_version(&config, conn_pool).await.unwrap(); + assert_eq!(starting_version, 0); } #[tokio::test] - async fn test_get_starting_version_with_checkpoint() { + async fn test_without_test_context() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); let indexer_processor_config = IndexerProcessorConfig { db_config: DbConfig { postgres_connection_string: db.get_db_url(), - db_pool_size: 1, + db_pool_size: 2, }, transaction_stream_config: TransactionStreamConfig { indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), @@ -191,22 +208,202 @@ mod tests { ) .await; - let binding = conn_pool.clone(); - let mut conn = binding.get().await.unwrap(); - let processor_name = indexer_processor_config.processor_config.name(); - diesel::insert_into(processor_status::table) - .values(ProcessorStatus { - processor: processor_name.to_string(), - last_success_version: 10, - last_transaction_timestamp: None, - }) - .execute(&mut conn) - .await - .expect("Failed to insert processor status"); - let starting_version = get_starting_version(&indexer_processor_config, conn_pool) .await .unwrap(); - assert_eq!(starting_version, 11); + + assert_eq!(starting_version, 0); } + + // #[tokio::test] + // async fn test_get_starting_version_with_checkpoint() { + // let mut db = PostgresTestDatabase::new(); + // db.setup().await.unwrap(); + // let indexer_processor_config = IndexerProcessorConfig { + // db_config: DbConfig { + // postgres_connection_string: db.get_db_url(), + // db_pool_size: 2, + // }, + // transaction_stream_config: TransactionStreamConfig { + // indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), + // starting_version: None, + // request_ending_version: None, + // auth_token: "test".to_string(), + // request_name_header: "test".to_string(), + // indexer_grpc_http2_ping_interval_secs: 1, + // indexer_grpc_http2_ping_timeout_secs: 1, + // indexer_grpc_reconnection_timeout_secs: 1, + // indexer_grpc_response_item_timeout_secs: 1, + // }, + // processor_config: ProcessorConfig::EventsProcessor, + // backfill_config: None, + // }; + // let conn_pool = new_db_pool( + // indexer_processor_config + // .db_config + // .postgres_connection_string + // .as_str(), + // Some(indexer_processor_config.db_config.db_pool_size), + // ) + // .await + // .expect("Failed to create connection pool"); + // run_migrations( + // indexer_processor_config + // .db_config + // .postgres_connection_string + // .clone(), + // conn_pool.clone(), + // ) + // .await; + + // let binding = conn_pool.clone(); + // let mut conn = binding.get().await.unwrap(); + // let processor_name = indexer_processor_config.processor_config.name(); + // diesel::insert_into(processor_status::table) + // .values(ProcessorStatus { + // processor: processor_name.to_string(), + // last_success_version: 10, + // last_transaction_timestamp: None, + // }) + // .execute(&mut conn) + // .await + // .expect("Failed to insert processor status"); + + // let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + // .await + // .unwrap(); + // assert_eq!(starting_version, 11); + // } + + // #[tokio::test] + // async fn test_backfill_get_starting_version_with_completed_checkpoint() { + // let mut db = PostgresTestDatabase::new(); + // let backfill_alias = "backfill_processor".to_string(); + // db.setup().await.unwrap(); + // let indexer_processor_config = IndexerProcessorConfig { + // db_config: DbConfig { + // postgres_connection_string: db.get_db_url(), + // db_pool_size: 2, + // }, + // transaction_stream_config: TransactionStreamConfig { + // indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), + // starting_version: None, + // request_ending_version: None, + // auth_token: "test".to_string(), + // request_name_header: "test".to_string(), + // indexer_grpc_http2_ping_interval_secs: 1, + // indexer_grpc_http2_ping_timeout_secs: 1, + // indexer_grpc_reconnection_timeout_secs: 1, + // indexer_grpc_response_item_timeout_secs: 1, + // }, + // processor_config: ProcessorConfig::EventsProcessor, + // backfill_config: Some(BackfillConfig { + // backfill_alias: backfill_alias.clone(), + // }), + // }; + // let conn_pool = new_db_pool( + // indexer_processor_config + // .db_config + // .postgres_connection_string + // .as_str(), + // Some(indexer_processor_config.db_config.db_pool_size), + // ) + // .await + // .expect("Failed to create connection pool"); + // run_migrations( + // indexer_processor_config + // .db_config + // .postgres_connection_string + // .clone(), + // conn_pool.clone(), + // ) + // .await; + + // let binding = conn_pool.clone(); + // let mut conn = binding.get().await.unwrap(); + + // diesel::insert_into(backfill_processor_status::table) + // .values(BackfillProcessorStatus { + // backfill_alias: backfill_alias.clone(), + // backfill_status: BackfillStatus::Complete, + // last_success_version: 10, + // last_transaction_timestamp: None, + // backfill_start_version: 0, + // backfill_end_version: 10, + // }) + // .execute(&mut conn) + // .await + // .expect("Failed to insert processor status"); + + // let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + // .await + // .unwrap(); + // assert_eq!(starting_version, 0); + // } + + // #[tokio::test] + // async fn test_backfill_get_starting_version_with_inprogress_checkpoint() { + // let mut db = PostgresTestDatabase::new(); + // let backfill_alias = "backfill_processor".to_string(); + // db.setup().await.unwrap(); + // let indexer_processor_config = IndexerProcessorConfig { + // db_config: DbConfig { + // postgres_connection_string: db.get_db_url(), + // db_pool_size: 2, + // }, + // transaction_stream_config: TransactionStreamConfig { + // indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), + // starting_version: None, + // request_ending_version: None, + // auth_token: "test".to_string(), + // request_name_header: "test".to_string(), + // indexer_grpc_http2_ping_interval_secs: 1, + // indexer_grpc_http2_ping_timeout_secs: 1, + // indexer_grpc_reconnection_timeout_secs: 1, + // indexer_grpc_response_item_timeout_secs: 1, + // }, + // processor_config: ProcessorConfig::EventsProcessor, + // backfill_config: Some(BackfillConfig { + // backfill_alias: backfill_alias.clone(), + // }), + // }; + // let conn_pool = new_db_pool( + // indexer_processor_config + // .db_config + // .postgres_connection_string + // .as_str(), + // Some(indexer_processor_config.db_config.db_pool_size), + // ) + // .await + // .expect("Failed to create connection pool"); + // run_migrations( + // indexer_processor_config + // .db_config + // .postgres_connection_string + // .clone(), + // conn_pool.clone(), + // ) + // .await; + + // let binding = conn_pool.clone(); + // let mut conn = binding.get().await.unwrap(); + + // diesel::insert_into(backfill_processor_status::table) + // .values(BackfillProcessorStatus { + // backfill_alias: backfill_alias.clone(), + // backfill_status: BackfillStatus::InProgress, + // last_success_version: 10, + // last_transaction_timestamp: None, + // backfill_start_version: 0, + // backfill_end_version: 10, + // }) + // .execute(&mut conn) + // .await + // .expect("Failed to insert processor status"); + + // let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + // .await + // .unwrap(); + // assert_eq!(starting_version, 11); + // } } From 2a750c4d8729e9ef8dd9d4409cc14f385357cd9e Mon Sep 17 00:00:00 2001 From: dermanyang Date: Thu, 17 Oct 2024 14:40:35 -0700 Subject: [PATCH 05/10] Simply abstractino to just produce the config --- .../src/utils/database.rs | 1 - .../src/utils/starting_version.rs | 380 ++++++------------ 2 files changed, 112 insertions(+), 269 deletions(-) diff --git a/aptos-indexer-processor-example/src/utils/database.rs b/aptos-indexer-processor-example/src/utils/database.rs index 984c706..8d2138f 100644 --- a/aptos-indexer-processor-example/src/utils/database.rs +++ b/aptos-indexer-processor-example/src/utils/database.rs @@ -291,7 +291,6 @@ pub async fn run_migrations(postgres_connection_string: String, _conn_pool: ArcD #[cfg(not(feature = "libpq"))] pub async fn run_migrations(postgres_connection_string: String, conn_pool: ArcDbPool) { use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; - info!("Running migrations: {:?}", postgres_connection_string); let conn = conn_pool // We need to use this since AsyncConnectionWrapper doesn't know how to diff --git a/aptos-indexer-processor-example/src/utils/starting_version.rs b/aptos-indexer-processor-example/src/utils/starting_version.rs index 787017b..8600a03 100644 --- a/aptos-indexer-processor-example/src/utils/starting_version.rs +++ b/aptos-indexer-processor-example/src/utils/starting_version.rs @@ -101,79 +101,13 @@ mod tests { use diesel_async::RunQueryDsl; use url::Url; - struct TestContext { - conn_pool: ArcDbPool, - indexer_processor_config: IndexerProcessorConfig, - } - - impl TestContext { - async fn new(backfill_config: Option) -> Self { - let mut db = PostgresTestDatabase::new(); - db.setup().await.unwrap(); - let indexer_processor_config = IndexerProcessorConfig { - db_config: DbConfig { - postgres_connection_string: db.get_db_url(), - db_pool_size: 2, - }, - transaction_stream_config: TransactionStreamConfig { - indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), - starting_version: None, - request_ending_version: None, - auth_token: "test".to_string(), - request_name_header: "test".to_string(), - indexer_grpc_http2_ping_interval_secs: 1, - indexer_grpc_http2_ping_timeout_secs: 1, - indexer_grpc_reconnection_timeout_secs: 1, - indexer_grpc_response_item_timeout_secs: 1, - }, - processor_config: ProcessorConfig::EventsProcessor, - backfill_config: backfill_config, - }; - - let conn_pool = new_db_pool( - indexer_processor_config - .db_config - .postgres_connection_string - .as_str(), - Some(indexer_processor_config.db_config.db_pool_size), - ) - .await - .expect("Failed to create connection pool"); - - run_migrations( - indexer_processor_config - .db_config - .postgres_connection_string - .clone(), - conn_pool.clone(), - ) - .await; - - TestContext { - conn_pool, - indexer_processor_config, - } - } - } - - #[tokio::test] - async fn test_with_test_context() { - let test_context = TestContext::new(None).await; - let config = test_context.indexer_processor_config; - let conn_pool = test_context.conn_pool; - - let starting_version = get_starting_version(&config, conn_pool).await.unwrap(); - - assert_eq!(starting_version, 0); - } - - #[tokio::test] - async fn test_without_test_context() { - let mut db = PostgresTestDatabase::new(); - db.setup().await.unwrap(); - let indexer_processor_config = IndexerProcessorConfig { + fn create_indexer_config( + db_url: String, + backfill_config: Option, + ) -> IndexerProcessorConfig { + return IndexerProcessorConfig { db_config: DbConfig { - postgres_connection_string: db.get_db_url(), + postgres_connection_string: db_url, db_pool_size: 2, }, transaction_stream_config: TransactionStreamConfig { @@ -188,25 +122,22 @@ mod tests { indexer_grpc_response_item_timeout_secs: 1, }, processor_config: ProcessorConfig::EventsProcessor, - backfill_config: None, + backfill_config: backfill_config, }; + } + + #[tokio::test] + async fn test_without_test_context() { + let mut db = PostgresTestDatabase::new(); + db.setup().await.unwrap(); + let indexer_processor_config = create_indexer_config(db.get_db_url(), None); let conn_pool = new_db_pool( - indexer_processor_config - .db_config - .postgres_connection_string - .as_str(), + db.get_db_url().as_str(), Some(indexer_processor_config.db_config.db_pool_size), ) .await .expect("Failed to create connection pool"); - run_migrations( - indexer_processor_config - .db_config - .postgres_connection_string - .clone(), - conn_pool.clone(), - ) - .await; + run_migrations(db.get_db_url(), conn_pool.clone()).await; let starting_version = get_starting_version(&indexer_processor_config, conn_pool) .await @@ -215,195 +146,108 @@ mod tests { assert_eq!(starting_version, 0); } - // #[tokio::test] - // async fn test_get_starting_version_with_checkpoint() { - // let mut db = PostgresTestDatabase::new(); - // db.setup().await.unwrap(); - // let indexer_processor_config = IndexerProcessorConfig { - // db_config: DbConfig { - // postgres_connection_string: db.get_db_url(), - // db_pool_size: 2, - // }, - // transaction_stream_config: TransactionStreamConfig { - // indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), - // starting_version: None, - // request_ending_version: None, - // auth_token: "test".to_string(), - // request_name_header: "test".to_string(), - // indexer_grpc_http2_ping_interval_secs: 1, - // indexer_grpc_http2_ping_timeout_secs: 1, - // indexer_grpc_reconnection_timeout_secs: 1, - // indexer_grpc_response_item_timeout_secs: 1, - // }, - // processor_config: ProcessorConfig::EventsProcessor, - // backfill_config: None, - // }; - // let conn_pool = new_db_pool( - // indexer_processor_config - // .db_config - // .postgres_connection_string - // .as_str(), - // Some(indexer_processor_config.db_config.db_pool_size), - // ) - // .await - // .expect("Failed to create connection pool"); - // run_migrations( - // indexer_processor_config - // .db_config - // .postgres_connection_string - // .clone(), - // conn_pool.clone(), - // ) - // .await; - - // let binding = conn_pool.clone(); - // let mut conn = binding.get().await.unwrap(); - // let processor_name = indexer_processor_config.processor_config.name(); - // diesel::insert_into(processor_status::table) - // .values(ProcessorStatus { - // processor: processor_name.to_string(), - // last_success_version: 10, - // last_transaction_timestamp: None, - // }) - // .execute(&mut conn) - // .await - // .expect("Failed to insert processor status"); - - // let starting_version = get_starting_version(&indexer_processor_config, conn_pool) - // .await - // .unwrap(); - // assert_eq!(starting_version, 11); - // } + #[tokio::test] + async fn test_get_starting_version_with_checkpoint() { + let mut db = PostgresTestDatabase::new(); + db.setup().await.unwrap(); + let indexer_processor_config = create_indexer_config(db.get_db_url(), None); + let conn_pool = new_db_pool( + db.get_db_url().as_str(), + Some(indexer_processor_config.db_config.db_pool_size), + ) + .await + .expect("Failed to create connection pool"); + run_migrations(db.get_db_url(), conn_pool.clone()).await; + diesel::insert_into(processor_status::table) + .values(ProcessorStatus { + processor: indexer_processor_config.processor_config.name().to_string(), + last_success_version: 10, + last_transaction_timestamp: None, + }) + .execute(&mut conn_pool.clone().get().await.unwrap()) + .await + .expect("Failed to insert processor status"); - // #[tokio::test] - // async fn test_backfill_get_starting_version_with_completed_checkpoint() { - // let mut db = PostgresTestDatabase::new(); - // let backfill_alias = "backfill_processor".to_string(); - // db.setup().await.unwrap(); - // let indexer_processor_config = IndexerProcessorConfig { - // db_config: DbConfig { - // postgres_connection_string: db.get_db_url(), - // db_pool_size: 2, - // }, - // transaction_stream_config: TransactionStreamConfig { - // indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), - // starting_version: None, - // request_ending_version: None, - // auth_token: "test".to_string(), - // request_name_header: "test".to_string(), - // indexer_grpc_http2_ping_interval_secs: 1, - // indexer_grpc_http2_ping_timeout_secs: 1, - // indexer_grpc_reconnection_timeout_secs: 1, - // indexer_grpc_response_item_timeout_secs: 1, - // }, - // processor_config: ProcessorConfig::EventsProcessor, - // backfill_config: Some(BackfillConfig { - // backfill_alias: backfill_alias.clone(), - // }), - // }; - // let conn_pool = new_db_pool( - // indexer_processor_config - // .db_config - // .postgres_connection_string - // .as_str(), - // Some(indexer_processor_config.db_config.db_pool_size), - // ) - // .await - // .expect("Failed to create connection pool"); - // run_migrations( - // indexer_processor_config - // .db_config - // .postgres_connection_string - // .clone(), - // conn_pool.clone(), - // ) - // .await; + let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + .await + .unwrap(); - // let binding = conn_pool.clone(); - // let mut conn = binding.get().await.unwrap(); + assert_eq!(starting_version, 11); + } - // diesel::insert_into(backfill_processor_status::table) - // .values(BackfillProcessorStatus { - // backfill_alias: backfill_alias.clone(), - // backfill_status: BackfillStatus::Complete, - // last_success_version: 10, - // last_transaction_timestamp: None, - // backfill_start_version: 0, - // backfill_end_version: 10, - // }) - // .execute(&mut conn) - // .await - // .expect("Failed to insert processor status"); + #[tokio::test] + async fn test_backfill_get_starting_version_with_completed_checkpoint() { + let mut db = PostgresTestDatabase::new(); + let backfill_alias = "backfill_processor".to_string(); + db.setup().await.unwrap(); + let indexer_processor_config = create_indexer_config( + db.get_db_url(), + Some(BackfillConfig { + backfill_alias: backfill_alias.clone(), + }), + ); + let conn_pool = new_db_pool( + db.get_db_url().as_str(), + Some(indexer_processor_config.db_config.db_pool_size), + ) + .await + .expect("Failed to create connection pool"); + run_migrations(db.get_db_url(), conn_pool.clone()).await; + diesel::insert_into(backfill_processor_status::table) + .values(BackfillProcessorStatus { + backfill_alias: backfill_alias.clone(), + backfill_status: BackfillStatus::Complete, + last_success_version: 10, + last_transaction_timestamp: None, + backfill_start_version: 0, + backfill_end_version: 10, + }) + .execute(&mut conn_pool.clone().get().await.unwrap()) + .await + .expect("Failed to insert processor status"); - // let starting_version = get_starting_version(&indexer_processor_config, conn_pool) - // .await - // .unwrap(); - // assert_eq!(starting_version, 0); - // } + let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + .await + .unwrap(); - // #[tokio::test] - // async fn test_backfill_get_starting_version_with_inprogress_checkpoint() { - // let mut db = PostgresTestDatabase::new(); - // let backfill_alias = "backfill_processor".to_string(); - // db.setup().await.unwrap(); - // let indexer_processor_config = IndexerProcessorConfig { - // db_config: DbConfig { - // postgres_connection_string: db.get_db_url(), - // db_pool_size: 2, - // }, - // transaction_stream_config: TransactionStreamConfig { - // indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), - // starting_version: None, - // request_ending_version: None, - // auth_token: "test".to_string(), - // request_name_header: "test".to_string(), - // indexer_grpc_http2_ping_interval_secs: 1, - // indexer_grpc_http2_ping_timeout_secs: 1, - // indexer_grpc_reconnection_timeout_secs: 1, - // indexer_grpc_response_item_timeout_secs: 1, - // }, - // processor_config: ProcessorConfig::EventsProcessor, - // backfill_config: Some(BackfillConfig { - // backfill_alias: backfill_alias.clone(), - // }), - // }; - // let conn_pool = new_db_pool( - // indexer_processor_config - // .db_config - // .postgres_connection_string - // .as_str(), - // Some(indexer_processor_config.db_config.db_pool_size), - // ) - // .await - // .expect("Failed to create connection pool"); - // run_migrations( - // indexer_processor_config - // .db_config - // .postgres_connection_string - // .clone(), - // conn_pool.clone(), - // ) - // .await; + assert_eq!(starting_version, 0); + } - // let binding = conn_pool.clone(); - // let mut conn = binding.get().await.unwrap(); + #[tokio::test] + async fn test_backfill_get_starting_version_with_inprogress_checkpoint() { + let mut db = PostgresTestDatabase::new(); + let backfill_alias = "backfill_processor".to_string(); + db.setup().await.unwrap(); + let indexer_processor_config = create_indexer_config( + db.get_db_url(), + Some(BackfillConfig { + backfill_alias: backfill_alias.clone(), + }), + ); + let conn_pool = new_db_pool( + db.get_db_url().as_str(), + Some(indexer_processor_config.db_config.db_pool_size), + ) + .await + .expect("Failed to create connection pool"); + run_migrations(db.get_db_url(), conn_pool.clone()).await; + diesel::insert_into(backfill_processor_status::table) + .values(BackfillProcessorStatus { + backfill_alias: backfill_alias.clone(), + backfill_status: BackfillStatus::InProgress, + last_success_version: 10, + last_transaction_timestamp: None, + backfill_start_version: 0, + backfill_end_version: 10, + }) + .execute(&mut conn_pool.clone().get().await.unwrap()) + .await + .expect("Failed to insert processor status"); - // diesel::insert_into(backfill_processor_status::table) - // .values(BackfillProcessorStatus { - // backfill_alias: backfill_alias.clone(), - // backfill_status: BackfillStatus::InProgress, - // last_success_version: 10, - // last_transaction_timestamp: None, - // backfill_start_version: 0, - // backfill_end_version: 10, - // }) - // .execute(&mut conn) - // .await - // .expect("Failed to insert processor status"); + let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + .await + .unwrap(); - // let starting_version = get_starting_version(&indexer_processor_config, conn_pool) - // .await - // .unwrap(); - // assert_eq!(starting_version, 11); - // } + assert_eq!(starting_version, 11); + } } From 4676d64d7ac92d521f53692da1b8c2ff9ed7bd2b Mon Sep 17 00:00:00 2001 From: dermanyang Date: Thu, 17 Oct 2024 15:49:59 -0700 Subject: [PATCH 06/10] Revert config changes --- aptos-indexer-processor-example/config.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aptos-indexer-processor-example/config.yaml b/aptos-indexer-processor-example/config.yaml index 99e1474..f61f005 100644 --- a/aptos-indexer-processor-example/config.yaml +++ b/aptos-indexer-processor-example/config.yaml @@ -3,13 +3,13 @@ health_check_port: 8085 server_config: processor_config: type: "events_processor" - backfill_config: - backfill_alias: "events_processor_backfill_1" + # backfill_config: + # backfill_alias: "events_processor_backfill_1" transaction_stream_config: indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443" starting_version: 0 - request_ending_version: 100000 - auth_token: "aptoslabs_No5KWpMQKvz_7ohYQejzxmgpL27BS9gK784GpA14EP9mb" + # request_ending_version: 100000 + auth_token: "AUTH_TOKEN" request_name_header: "events-processor" db_config: postgres_connection_string: postgresql://postgres:@localhost:5432/example From fadfae49c3d993f76b87b48c6e0b41afab87f2ea Mon Sep 17 00:00:00 2001 From: dermanyang Date: Thu, 17 Oct 2024 15:57:54 -0700 Subject: [PATCH 07/10] Correct test name --- aptos-indexer-processor-example/src/utils/starting_version.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aptos-indexer-processor-example/src/utils/starting_version.rs b/aptos-indexer-processor-example/src/utils/starting_version.rs index 8600a03..8ca0cf8 100644 --- a/aptos-indexer-processor-example/src/utils/starting_version.rs +++ b/aptos-indexer-processor-example/src/utils/starting_version.rs @@ -127,7 +127,7 @@ mod tests { } #[tokio::test] - async fn test_without_test_context() { + async fn test_get_starting_version_no_checkpoint() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); let indexer_processor_config = create_indexer_config(db.get_db_url(), None); From 2523b4e5ce301bedc6abc3b61845d02fb77887de Mon Sep 17 00:00:00 2001 From: dermanyang Date: Mon, 21 Oct 2024 11:33:09 -0700 Subject: [PATCH 08/10] Add test workflow --- .../.github/workflows/lint.yaml | 36 +++++++++++++++++++ .../.github/workflows/tests.yaml | 33 +++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 aptos-indexer-processor-example/.github/workflows/lint.yaml create mode 100644 aptos-indexer-processor-example/.github/workflows/tests.yaml diff --git a/aptos-indexer-processor-example/.github/workflows/lint.yaml b/aptos-indexer-processor-example/.github/workflows/lint.yaml new file mode 100644 index 0000000..374fd45 --- /dev/null +++ b/aptos-indexer-processor-example/.github/workflows/lint.yaml @@ -0,0 +1,36 @@ +name: "Lint" +on: + # Allow us to run this specific workflow without a PR + workflow_dispatch: + pull_request: + push: + branches: + - main + +# cancel redundant builds +concurrency: + # for push and workflow_dispatch events we use `github.sha` in the concurrency group and don't really cancel each other out/limit concurrency + # for pull_request events newer jobs cancel earlier jobs to save on CI etc. + group: ${{ github.workflow }}-${{ github.event_name }}-${{ (github.event_name == 'push' || github.event_name == 'workflow_dispatch') && github.sha || github.head_ref || github.ref }} + cancel-in-progress: true + +jobs: + Lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Install Dependencies and Run Linter + run: | + sudo apt update && sudo apt install libdw-dev + cargo install cargo-sort + rustup update + rustup toolchain install nightly + rustup component add clippy --toolchain nightly + rustup component add rustfmt --toolchain nightly + scripts/rust_lint.sh --check + working-directory: aptos-indexer-processors-sdk + + - name: Check Banned Dependencies + run: bash scripts/check_banned_deps.sh + working-directory: aptos-indexer-processors-sdk \ No newline at end of file diff --git a/aptos-indexer-processor-example/.github/workflows/tests.yaml b/aptos-indexer-processor-example/.github/workflows/tests.yaml new file mode 100644 index 0000000..0c1c847 --- /dev/null +++ b/aptos-indexer-processor-example/.github/workflows/tests.yaml @@ -0,0 +1,33 @@ +name: "Tests" +on: + workflow_dispatch: + pull_request: + push: + branches: + - main + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ (github.event_name == 'push' || github.event_name == 'workflow_dispatch') && github.sha || github.head_ref || github.ref }} + cancel-in-progress: true + +jobs: + Test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Install Dependencies + run: | + sudo apt update && sudo apt install libdw-dev + cargo install cargo-sort + rustup update + rustup toolchain install nightly + working-directory: aptos-indexer-processors-sdk + + - name: Build with No Default Features + run: cargo build --no-default-features + working-directory: aptos-indexer-processors-sdk + + - name: Run Tests + run: cargo test + working-directory: aptos-indexer-processors-sdk \ No newline at end of file From b15b59b1c2c565a8759f9b1274d5330472db3878 Mon Sep 17 00:00:00 2001 From: dermanyang Date: Mon, 21 Oct 2024 11:34:15 -0700 Subject: [PATCH 09/10] remove workflows --- .../.github/workflows/lint.yaml | 36 ------------------- .../.github/workflows/tests.yaml | 33 ----------------- 2 files changed, 69 deletions(-) delete mode 100644 aptos-indexer-processor-example/.github/workflows/lint.yaml delete mode 100644 aptos-indexer-processor-example/.github/workflows/tests.yaml diff --git a/aptos-indexer-processor-example/.github/workflows/lint.yaml b/aptos-indexer-processor-example/.github/workflows/lint.yaml deleted file mode 100644 index 374fd45..0000000 --- a/aptos-indexer-processor-example/.github/workflows/lint.yaml +++ /dev/null @@ -1,36 +0,0 @@ -name: "Lint" -on: - # Allow us to run this specific workflow without a PR - workflow_dispatch: - pull_request: - push: - branches: - - main - -# cancel redundant builds -concurrency: - # for push and workflow_dispatch events we use `github.sha` in the concurrency group and don't really cancel each other out/limit concurrency - # for pull_request events newer jobs cancel earlier jobs to save on CI etc. - group: ${{ github.workflow }}-${{ github.event_name }}-${{ (github.event_name == 'push' || github.event_name == 'workflow_dispatch') && github.sha || github.head_ref || github.ref }} - cancel-in-progress: true - -jobs: - Lint: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - - name: Install Dependencies and Run Linter - run: | - sudo apt update && sudo apt install libdw-dev - cargo install cargo-sort - rustup update - rustup toolchain install nightly - rustup component add clippy --toolchain nightly - rustup component add rustfmt --toolchain nightly - scripts/rust_lint.sh --check - working-directory: aptos-indexer-processors-sdk - - - name: Check Banned Dependencies - run: bash scripts/check_banned_deps.sh - working-directory: aptos-indexer-processors-sdk \ No newline at end of file diff --git a/aptos-indexer-processor-example/.github/workflows/tests.yaml b/aptos-indexer-processor-example/.github/workflows/tests.yaml deleted file mode 100644 index 0c1c847..0000000 --- a/aptos-indexer-processor-example/.github/workflows/tests.yaml +++ /dev/null @@ -1,33 +0,0 @@ -name: "Tests" -on: - workflow_dispatch: - pull_request: - push: - branches: - - main - -concurrency: - group: ${{ github.workflow }}-${{ github.event_name }}-${{ (github.event_name == 'push' || github.event_name == 'workflow_dispatch') && github.sha || github.head_ref || github.ref }} - cancel-in-progress: true - -jobs: - Test: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - - name: Install Dependencies - run: | - sudo apt update && sudo apt install libdw-dev - cargo install cargo-sort - rustup update - rustup toolchain install nightly - working-directory: aptos-indexer-processors-sdk - - - name: Build with No Default Features - run: cargo build --no-default-features - working-directory: aptos-indexer-processors-sdk - - - name: Run Tests - run: cargo test - working-directory: aptos-indexer-processors-sdk \ No newline at end of file From fbbae1635e9dd857bbb35ddc6d3e37c261c303f1 Mon Sep 17 00:00:00 2001 From: dermanyang Date: Tue, 22 Oct 2024 10:56:27 -0700 Subject: [PATCH 10/10] add test --- .../src/utils/starting_version.rs | 31 ++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/aptos-indexer-processor-example/src/utils/starting_version.rs b/aptos-indexer-processor-example/src/utils/starting_version.rs index 8ca0cf8..d8955fe 100644 --- a/aptos-indexer-processor-example/src/utils/starting_version.rs +++ b/aptos-indexer-processor-example/src/utils/starting_version.rs @@ -70,7 +70,7 @@ async fn get_latest_processed_version_from_db( .context("Failed to query processor_status table.")?; // Return None if there is no checkpoint. Otherwise, - // return the higher of the checkpointed version + 1 the `starting_version`. + // return the higher of the checkpointed version + 1 and `starting_version`. Ok(status.map(|status| { std::cmp::max( status.last_success_version as u64 + 1, @@ -104,6 +104,7 @@ mod tests { fn create_indexer_config( db_url: String, backfill_config: Option, + starting_version: Option, ) -> IndexerProcessorConfig { return IndexerProcessorConfig { db_config: DbConfig { @@ -112,7 +113,7 @@ mod tests { }, transaction_stream_config: TransactionStreamConfig { indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), - starting_version: None, + starting_version: starting_version, request_ending_version: None, auth_token: "test".to_string(), request_name_header: "test".to_string(), @@ -130,7 +131,7 @@ mod tests { async fn test_get_starting_version_no_checkpoint() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); - let indexer_processor_config = create_indexer_config(db.get_db_url(), None); + let indexer_processor_config = create_indexer_config(db.get_db_url(), None, None); let conn_pool = new_db_pool( db.get_db_url().as_str(), Some(indexer_processor_config.db_config.db_pool_size), @@ -146,11 +147,31 @@ mod tests { assert_eq!(starting_version, 0); } + #[tokio::test] + async fn test_get_starting_version_no_checkpoint_with_start_ver() { + let mut db = PostgresTestDatabase::new(); + db.setup().await.unwrap(); + let indexer_processor_config = create_indexer_config(db.get_db_url(), None, Some(5)); + let conn_pool = new_db_pool( + db.get_db_url().as_str(), + Some(indexer_processor_config.db_config.db_pool_size), + ) + .await + .expect("Failed to create connection pool"); + run_migrations(db.get_db_url(), conn_pool.clone()).await; + + let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + .await + .unwrap(); + + assert_eq!(starting_version, 5); + } + #[tokio::test] async fn test_get_starting_version_with_checkpoint() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); - let indexer_processor_config = create_indexer_config(db.get_db_url(), None); + let indexer_processor_config = create_indexer_config(db.get_db_url(), None, None); let conn_pool = new_db_pool( db.get_db_url().as_str(), Some(indexer_processor_config.db_config.db_pool_size), @@ -185,6 +206,7 @@ mod tests { Some(BackfillConfig { backfill_alias: backfill_alias.clone(), }), + None, ); let conn_pool = new_db_pool( db.get_db_url().as_str(), @@ -223,6 +245,7 @@ mod tests { Some(BackfillConfig { backfill_alias: backfill_alias.clone(), }), + None, ); let conn_pool = new_db_pool( db.get_db_url().as_str(),