diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 970a0c382..51f288ca2 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -141,7 +141,7 @@ checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" [[package]] name = "aptos-indexer-processor-sdk" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=c8f12a75c3648fba889c3da657ff713cd5e676dc#c8f12a75c3648fba889c3da657ff713cd5e676dc" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=e6867c50a2c30ef16ad6f82e02313b2ba5ce361a#e6867c50a2c30ef16ad6f82e02313b2ba5ce361a" dependencies = [ "ahash", "anyhow", @@ -174,7 +174,7 @@ dependencies = [ [[package]] name = "aptos-indexer-processor-sdk-server-framework" version = "1.0.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=c8f12a75c3648fba889c3da657ff713cd5e676dc#c8f12a75c3648fba889c3da657ff713cd5e676dc" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=e6867c50a2c30ef16ad6f82e02313b2ba5ce361a#e6867c50a2c30ef16ad6f82e02313b2ba5ce361a" dependencies = [ "anyhow", "aptos-indexer-processor-sdk", @@ -207,7 +207,7 @@ dependencies = [ [[package]] name = "aptos-indexer-testing-framework" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=c8f12a75c3648fba889c3da657ff713cd5e676dc#c8f12a75c3648fba889c3da657ff713cd5e676dc" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=e6867c50a2c30ef16ad6f82e02313b2ba5ce361a#e6867c50a2c30ef16ad6f82e02313b2ba5ce361a" dependencies = [ "anyhow", "aptos-indexer-processor-sdk", @@ -235,10 +235,10 @@ dependencies = [ [[package]] name = "aptos-indexer-transaction-stream" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=c8f12a75c3648fba889c3da657ff713cd5e676dc#c8f12a75c3648fba889c3da657ff713cd5e676dc" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=e6867c50a2c30ef16ad6f82e02313b2ba5ce361a#e6867c50a2c30ef16ad6f82e02313b2ba5ce361a" dependencies = [ "anyhow", - "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=c8f12a75c3648fba889c3da657ff713cd5e676dc)", + "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=e6867c50a2c30ef16ad6f82e02313b2ba5ce361a)", "aptos-protos 1.3.1 (git+https://github.com/aptos-labs/aptos-core.git?rev=5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb)", "chrono", "futures-util", @@ -263,7 +263,7 @@ dependencies = [ [[package]] name = "aptos-moving-average" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=c8f12a75c3648fba889c3da657ff713cd5e676dc#c8f12a75c3648fba889c3da657ff713cd5e676dc" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=e6867c50a2c30ef16ad6f82e02313b2ba5ce361a#e6867c50a2c30ef16ad6f82e02313b2ba5ce361a" dependencies = [ "chrono", ] @@ -2247,7 +2247,7 @@ dependencies = [ [[package]] name = "instrumented-channel" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=c8f12a75c3648fba889c3da657ff713cd5e676dc#c8f12a75c3648fba889c3da657ff713cd5e676dc" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=e6867c50a2c30ef16ad6f82e02313b2ba5ce361a#e6867c50a2c30ef16ad6f82e02313b2ba5ce361a" dependencies = [ "delegate", "derive_builder", @@ -4130,7 +4130,7 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "sample" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=c8f12a75c3648fba889c3da657ff713cd5e676dc#c8f12a75c3648fba889c3da657ff713cd5e676dc" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=e6867c50a2c30ef16ad6f82e02313b2ba5ce361a#e6867c50a2c30ef16ad6f82e02313b2ba5ce361a" dependencies = [ "tracing", ] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index b0205028e..40095ec30 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -30,12 +30,12 @@ testing-transactions = { path = "testing-transactions" } ahash = { version = "0.8.7", features = ["serde"] } anyhow = "1.0.86" -aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "c8f12a75c3648fba889c3da657ff713cd5e676dc" } -aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "c8f12a75c3648fba889c3da657ff713cd5e676dc" } +aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "e6867c50a2c30ef16ad6f82e02313b2ba5ce361a" } +aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "e6867c50a2c30ef16ad6f82e02313b2ba5ce361a" } aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb" } aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" } aptos-indexer-test-transactions = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "4b67e5d816d554b5af4b2c130b283d00799268b7" } -aptos-indexer-testing-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "c8f12a75c3648fba889c3da657ff713cd5e676dc" } +aptos-indexer-testing-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "e6867c50a2c30ef16ad6f82e02313b2ba5ce361a" } async-trait = "0.1.53" backtrace = "0.3.58" base64 = "0.13.0" diff --git a/rust/integration-tests/src/sdk_tests/account_transaction_processor_tests.rs b/rust/integration-tests/src/sdk_tests/account_transaction_processor_tests.rs index 5dfbb77b5..f8cbf8ac3 100644 --- a/rust/integration-tests/src/sdk_tests/account_transaction_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/account_transaction_processor_tests.rs @@ -9,12 +9,9 @@ use std::collections::HashSet; pub async fn setup_acc_txn_processor_config( test_context: &SdkTestContext, - staring_version: u64, - txn_count: usize, db_url: &str, ) -> (IndexerProcessorConfig, &'static str) { - let transaction_stream_config = - test_context.create_transaction_stream_config(staring_version, txn_count as u64); // since this will be always 1, we can remove from the arg list + let transaction_stream_config = test_context.create_transaction_stream_config(); let postgres_config = PostgresConfig { connection_string: db_url.to_string(), db_pool_size: 100, @@ -69,7 +66,6 @@ mod tests { async fn mainnet_acc_txns_processor() { process_single_mainnet_txn( IMPORTED_MAINNET_TXNS_145959468_ACCOUNT_TRANSACTION, - 145959468, Some("account_transaction_test".to_string()), ) .await; @@ -83,18 +79,13 @@ mod tests { async fn mainnet_acc_txns_processor_delete() { process_single_mainnet_txn( IMPORTED_MAINNET_TXNS_423176063_ACCOUNT_TRANSACTION_DELETE, - 423176063, Some("account_transaction_delete_test".to_string()), ) .await; } // Helper function to abstract out the single transaction processing - async fn process_single_mainnet_txn( - txn: &[u8], - txn_version: i64, - test_case_name: Option, - ) { + async fn process_single_mainnet_txn(txn: &[u8], test_case_name: Option) { let (diff_flag, custom_output_path) = get_test_config(); let output_path = custom_output_path .unwrap_or_else(|| format!("{}/imported_mainnet_txns", DEFAULT_OUTPUT_FOLDER)); @@ -103,7 +94,7 @@ mod tests { let db_url = db.get_db_url(); let (indexer_processor_config, processor_name) = - setup_acc_txn_processor_config(&test_context, txn_version as u64, 1, &db_url).await; + setup_acc_txn_processor_config(&test_context, &db_url).await; let acc_txns_processor = AccountTransactionsProcessor::new(indexer_processor_config) .await @@ -114,7 +105,6 @@ mod tests { acc_txns_processor, load_data, db_url, - vec![txn_version], diff_flag, output_path.clone(), test_case_name.clone(), @@ -124,18 +114,18 @@ mod tests { Ok(mut db_value) => { let _ = validate_json( &mut db_value, - txn_version as u64, + test_context.get_request_start_version(), processor_name, output_path.clone(), test_case_name, ); }, Err(e) => { - eprintln!( - "[ERROR] Failed to run processor for txn version {}: {}", - 1, e + panic!( + "Test failed on transactions {:?} due to processor error: {}", + test_context.get_test_transaction_versions(), + e ); - panic!("Test failed due to processor error"); }, } } diff --git a/rust/integration-tests/src/sdk_tests/ans_processor_tests.rs b/rust/integration-tests/src/sdk_tests/ans_processor_tests.rs index 895744a48..a4d925284 100644 --- a/rust/integration-tests/src/sdk_tests/ans_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/ans_processor_tests.rs @@ -12,12 +12,9 @@ use std::collections::HashSet; pub async fn setup_ans_processor_config( test_context: &SdkTestContext, - staring_version: u64, - txn_count: usize, db_url: &str, ) -> (IndexerProcessorConfig, &'static str) { - let transaction_stream_config = - test_context.create_transaction_stream_config(staring_version, txn_count as u64); // since this will be always 1, we can remove from the arg list + let transaction_stream_config = test_context.create_transaction_stream_config(); // since this will be always 1, we can remove from the arg list let postgres_config = PostgresConfig { connection_string: db_url.to_string(), db_pool_size: 100, @@ -82,7 +79,6 @@ mod tests { async fn mainnet_current_ans_primary_name_v2() { process_single_mainnet_event_txn( IMPORTED_MAINNET_TXNS_1056780409_ANS_CURRENT_ANS_PRIMARY_NAME_V2, - 1056780409, Some("test_current_ans_primary_name_v2".to_string()), ) .await; @@ -98,7 +94,6 @@ mod tests { async fn mainnet_ans_lookup_v2() { process_single_mainnet_event_txn( IMPORTED_MAINNET_TXNS_303690531_ANS_LOOKUP_V2, - 303690531, Some("test_ans_lookup_v2".to_string()), ) .await; @@ -114,17 +109,12 @@ mod tests { async fn mainnet_current_ans_lookup_v2() { process_single_mainnet_event_txn( IMPORTED_MAINNET_TXNS_438536688_ANS_CURRENT_ANS_LOOKUP_V2, - 438536688, Some("test_current_ans_lookup_v2".to_string()), ) .await; } // Helper function to abstract out the single transaction processing - async fn process_single_mainnet_event_txn( - txn: &[u8], - txn_version: i64, - test_case_name: Option, - ) { + async fn process_single_mainnet_event_txn(txn: &[u8], test_case_name: Option) { let (diff_flag, custom_output_path) = get_test_config(); let output_path = custom_output_path .unwrap_or_else(|| format!("{}/imported_mainnet_txns", DEFAULT_OUTPUT_FOLDER)); @@ -133,7 +123,7 @@ mod tests { let db_url = db.get_db_url(); let (indexer_processor_config, processor_name) = - setup_ans_processor_config(&test_context, txn_version as u64, 1, &db_url).await; + setup_ans_processor_config(&test_context, &db_url).await; let ans_processor = AnsProcessor::new(indexer_processor_config) .await @@ -144,7 +134,6 @@ mod tests { ans_processor, load_data, db_url, - vec![txn_version], diff_flag, output_path.clone(), test_case_name.clone(), @@ -154,18 +143,18 @@ mod tests { Ok(mut db_value) => { let _ = validate_json( &mut db_value, - txn_version as u64, + test_context.get_request_start_version(), processor_name, output_path.clone(), test_case_name, ); }, Err(e) => { - eprintln!( - "[ERROR] Failed to run processor for txn version {}: {}", - 1, e + panic!( + "Test failed on transactions {:?} due to processor error: {}", + test_context.get_test_transaction_versions(), + e ); - panic!("Test failed due to processor error"); }, } } diff --git a/rust/integration-tests/src/sdk_tests/default_processor_tests.rs b/rust/integration-tests/src/sdk_tests/default_processor_tests.rs index 410281248..72676c0cb 100644 --- a/rust/integration-tests/src/sdk_tests/default_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/default_processor_tests.rs @@ -9,12 +9,9 @@ use std::collections::HashSet; pub async fn setup_default_processor_config( test_context: &SdkTestContext, - staring_version: u64, - txn_count: usize, db_url: &str, ) -> (IndexerProcessorConfig, &'static str) { - let transaction_stream_config = - test_context.create_transaction_stream_config(staring_version, txn_count as u64); // since this will be always 1, we can remove from the arg list + let transaction_stream_config = test_context.create_transaction_stream_config(); // since this will be always 1, we can remove from the arg list let postgres_config = PostgresConfig { connection_string: db_url.to_string(), db_pool_size: 100, @@ -62,7 +59,6 @@ mod tests { async fn mainnet_table_items() { process_single_mainnet_event_txn( IMPORTED_MAINNET_TXNS_155112189_DEFAULT_TABLE_ITEMS, - 155112189, Some("test_table_items".to_string()), ) .await; @@ -72,7 +68,6 @@ mod tests { async fn mainnet_current_table_items() { process_single_mainnet_event_txn( IMPORTED_MAINNET_TXNS_1845035942_DEFAULT_CURRENT_TABLE_ITEMS, - 1845035942, Some("test_current_table_items".to_string()), ) .await; @@ -82,18 +77,13 @@ mod tests { async fn mainnet_block_metadata_txns() { process_single_mainnet_event_txn( IMPORTED_MAINNET_TXNS_513424821_DEFAULT_BLOCK_METADATA_TRANSACTIONS, - 513424821, Some("block_metadata_transactions".to_string()), ) .await; } // Helper function to abstract out the single transaction processing - async fn process_single_mainnet_event_txn( - txn: &[u8], - txn_version: i64, - test_case_name: Option, - ) { + async fn process_single_mainnet_event_txn(txn: &[u8], test_case_name: Option) { let (diff_flag, custom_output_path) = get_test_config(); let output_path = custom_output_path .unwrap_or_else(|| format!("{}/imported_mainnet_txns", DEFAULT_OUTPUT_FOLDER)); @@ -102,7 +92,7 @@ mod tests { let db_url = db.get_db_url(); let (indexer_processor_config, processor_name) = - setup_default_processor_config(&test_context, txn_version as u64, 1, &db_url).await; + setup_default_processor_config(&test_context, &db_url).await; let default_processor = DefaultProcessor::new(indexer_processor_config) .await @@ -113,7 +103,6 @@ mod tests { default_processor, load_data, db_url, - vec![txn_version], diff_flag, output_path.clone(), test_case_name.clone(), @@ -123,18 +112,18 @@ mod tests { Ok(mut db_value) => { let _ = validate_json( &mut db_value, - txn_version as u64, + test_context.get_request_start_version(), processor_name, output_path.clone(), test_case_name, ); }, Err(e) => { - eprintln!( - "[ERROR] Failed to run processor for txn version {}: {}", - 1, e + panic!( + "Test failed on transactions {:?} due to processor error: {}", + test_context.get_test_transaction_versions(), + e ); - panic!("Test failed due to processor error"); }, } } diff --git a/rust/integration-tests/src/sdk_tests/events_processor_tests.rs b/rust/integration-tests/src/sdk_tests/events_processor_tests.rs index cda78a263..d5f6086a8 100644 --- a/rust/integration-tests/src/sdk_tests/events_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/events_processor_tests.rs @@ -9,12 +9,9 @@ use std::collections::HashSet; pub async fn setup_events_processor_config( test_context: &SdkTestContext, - staring_version: u64, - txn_count: usize, db_url: &str, ) -> (IndexerProcessorConfig, &'static str) { - let transaction_stream_config = - test_context.create_transaction_stream_config(staring_version, txn_count as u64); // since this will be always 1, we can remove from the arg list + let transaction_stream_config = test_context.create_transaction_stream_config(); // since this will be always 1, we can remove from the arg list let postgres_config = PostgresConfig { connection_string: db_url.to_string(), db_pool_size: 100, @@ -67,7 +64,6 @@ mod tests { async fn testnet_events_processor_genesis_txn() { process_single_testnet_event_txn( IMPORTED_TESTNET_TXNS_1_GENESIS, - 1, Some("genesis_txn_test".to_string()), ) .await; @@ -77,7 +73,6 @@ mod tests { async fn testnet_events_processor_new_block_event() { process_single_testnet_event_txn( IMPORTED_TESTNET_TXNS_2_NEW_BLOCK_EVENT, - 2, Some("new_block_event_test".to_string()), ) .await; @@ -87,7 +82,6 @@ mod tests { async fn testnet_events_processor_empty_txn() { process_single_testnet_event_txn( IMPORTED_TESTNET_TXNS_3_EMPTY_TXN, - 3, Some("empty_txn_test".to_string()), ) .await; @@ -97,7 +91,6 @@ mod tests { async fn testnet_events_processor_coin_register_fa_metadata() { process_single_testnet_event_txn( IMPORTED_TESTNET_TXNS_278556781_V1_COIN_REGISTER_FA_METADATA, - 278556781, Some("coin_register_fa_metadata_test".to_string()), ) .await; @@ -107,7 +100,6 @@ mod tests { async fn testnet_events_processor_fa_metadata() { process_single_testnet_event_txn( IMPORTED_TESTNET_TXNS_1255836496_V2_FA_METADATA_, - 1255836496, Some("fa_metadata_test".to_string()), ) .await; @@ -117,7 +109,6 @@ mod tests { async fn testnet_events_processor_fa_activities() { process_single_testnet_event_txn( IMPORTED_TESTNET_TXNS_5992795934_FA_ACTIVITIES, - 5992795934, Some("fa_activities_test".to_string()), ) .await; @@ -126,12 +117,8 @@ mod tests { /// Example test case of not using custom name #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn testnet_events_processor_coin_register() { - process_single_testnet_event_txn( - IMPORTED_TESTNET_TXNS_5979639459_COIN_REGISTER, - 5979639459, - None, - ) - .await; + process_single_testnet_event_txn(IMPORTED_TESTNET_TXNS_5979639459_COIN_REGISTER, None) + .await; } // Example 2: Test for multiple transactions handling @@ -156,13 +143,8 @@ mod tests { let starting_version = transaction_batches[0].version; let db_url = db.get_db_url(); - let (indexer_processor_config, _processor_name) = setup_events_processor_config( - &test_context, - starting_version, - transaction_batches.len(), - &db_url, - ) - .await; + let (indexer_processor_config, _processor_name) = + setup_events_processor_config(&test_context, &db_url).await; let events_processor = EventsProcessor::new(indexer_processor_config) .await @@ -173,7 +155,6 @@ mod tests { events_processor, load_data, db_url, - vec![5523474016, 5979639459], diff_flag, output_path.clone(), Some("multi_txns_handling_test".to_string()), @@ -200,11 +181,7 @@ mod tests { } // Helper function to abstract out the single transaction processing - async fn process_single_testnet_event_txn( - txn: &[u8], - txn_version: i64, - test_case_name: Option, - ) { + async fn process_single_testnet_event_txn(txn: &[u8], test_case_name: Option) { let (diff_flag, custom_output_path) = get_test_config(); let output_path = custom_output_path .unwrap_or_else(|| format!("{}/imported_testnet_txns", DEFAULT_OUTPUT_FOLDER)); @@ -213,7 +190,7 @@ mod tests { let db_url = db.get_db_url(); let (indexer_processor_config, processor_name) = - setup_events_processor_config(&test_context, txn_version as u64, 1, &db_url).await; + setup_events_processor_config(&test_context, &db_url).await; let events_processor = EventsProcessor::new(indexer_processor_config) .await @@ -224,7 +201,6 @@ mod tests { events_processor, load_data, db_url, - vec![txn_version], diff_flag, output_path.clone(), test_case_name.clone(), @@ -234,18 +210,18 @@ mod tests { Ok(mut db_value) => { let _ = validate_json( &mut db_value, - txn_version as u64, + test_context.get_request_start_version(), processor_name, output_path.clone(), test_case_name, ); }, Err(e) => { - eprintln!( - "[ERROR] Failed to run processor for txn version {}: {}", - 1, e + panic!( + "Test failed on transactions {:?} due to processor error: {}", + test_context.get_test_transaction_versions(), + e ); - panic!("Test failed due to processor error"); }, } } diff --git a/rust/integration-tests/src/sdk_tests/fungible_asset_processor_tests.rs b/rust/integration-tests/src/sdk_tests/fungible_asset_processor_tests.rs index 0f21d224d..2c9105b85 100644 --- a/rust/integration-tests/src/sdk_tests/fungible_asset_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/fungible_asset_processor_tests.rs @@ -9,12 +9,9 @@ use std::collections::HashSet; pub fn setup_fa_processor_config( test_context: &SdkTestContext, - staring_version: u64, - txn_count: usize, db_url: &str, ) -> (IndexerProcessorConfig, &'static str) { - let transaction_stream_config = - test_context.create_transaction_stream_config(staring_version, txn_count as u64); + let transaction_stream_config = test_context.create_transaction_stream_config(); let postgres_config = PostgresConfig { connection_string: db_url.to_string(), db_pool_size: 100, @@ -68,7 +65,6 @@ mod sdk_fungible_asset_processor_tests { async fn test_fungible_asset_processor_validator_txn() { process_single_testnet_fa_txns( IMPORTED_TESTNET_TXNS_5523474016_VALIDATOR_TXN, - 5523474016, Some("validator_txn_test".to_string()), ) .await; @@ -79,7 +75,6 @@ mod sdk_fungible_asset_processor_tests { async fn test_fungible_asset_processor_coin_register_txn() { process_single_testnet_fa_txns( IMPORTED_TESTNET_TXNS_5979639459_COIN_REGISTER, - 5979639459, Some("coin_register_txn_test".to_string()), ) .await; @@ -89,7 +84,6 @@ mod sdk_fungible_asset_processor_tests { async fn test_fungible_asset_processor_fa_activities_txn() { process_single_testnet_fa_txns( IMPORTED_TESTNET_TXNS_5992795934_FA_ACTIVITIES, - 5992795934, Some("fa_activities_txn_test".to_string()), ) .await; @@ -112,7 +106,6 @@ mod sdk_fungible_asset_processor_tests { async fn test_fungible_asset_processor_coin_and_fa_transfers() { process_single_testnet_fa_txns( IMPORTED_MAINNET_TXNS_999929475_COIN_AND_FA_TRANSFERS, - 999929475, Some("coin_and_fa_transfers_test".to_string()), ) .await; @@ -129,7 +122,6 @@ mod sdk_fungible_asset_processor_tests { async fn test_fungible_asset_processor_v1_events() { process_single_testnet_fa_txns( IMPORTED_MAINNET_TXNS_508365567_FA_V1_EVENTS, - 508365567, Some("v1_events_test".to_string()), ) .await; @@ -144,7 +136,6 @@ mod sdk_fungible_asset_processor_tests { async fn test_fungible_asset_processor_v2_frozen_event() { process_single_testnet_fa_txns( IMPORTED_TESTNET_TXNS_1200394037_FA_V2_FROZEN_EVENT, - 1200394037, Some("v2_frozen_event_test".to_string()), ) .await; @@ -160,18 +151,13 @@ mod sdk_fungible_asset_processor_tests { async fn test_fungible_asset_processor_concurrent_fa() { process_single_testnet_fa_txns( IMPORTED_TESTNET_TXNS_2646510387_CONCURRENT_FA, - 2646510387, Some("concurrent_fa_test".to_string()), ) .await; } // Helper function to abstract out the transaction processing - async fn process_single_testnet_fa_txns( - txn: &[u8], - txn_version: i64, - test_case_name: Option, - ) { + async fn process_single_testnet_fa_txns(txn: &[u8], test_case_name: Option) { let (diff_flag, custom_output_path) = get_test_config(); let output_path = custom_output_path.unwrap_or_else(|| DEFAULT_OUTPUT_FOLDER.to_string()); @@ -179,7 +165,7 @@ mod sdk_fungible_asset_processor_tests { let db_url = db.get_db_url(); let (indexer_processor_config, processor_name) = - setup_fa_processor_config(&test_context, txn_version as u64, 1, &db_url); + setup_fa_processor_config(&test_context, &db_url); let fungible_asset_processor = FungibleAssetProcessor::new(indexer_processor_config) .await @@ -190,7 +176,6 @@ mod sdk_fungible_asset_processor_tests { fungible_asset_processor, load_data, db_url, - vec![txn_version], diff_flag, output_path.clone(), test_case_name.clone(), @@ -200,18 +185,18 @@ mod sdk_fungible_asset_processor_tests { Ok(mut db_value) => { let _ = validate_json( &mut db_value, - txn_version as u64, + test_context.get_request_start_version(), processor_name, output_path.clone(), test_case_name, ); }, Err(e) => { - eprintln!( - "[ERROR] Failed to run processor for txn version {}: {}", - 1, e + panic!( + "Test failed on transactions {:?} due to processor error: {}", + test_context.get_test_transaction_versions(), + e ); - panic!("Test failed due to processor error"); }, } } diff --git a/rust/integration-tests/src/sdk_tests/mod.rs b/rust/integration-tests/src/sdk_tests/mod.rs index 95f8670e5..02f7de398 100644 --- a/rust/integration-tests/src/sdk_tests/mod.rs +++ b/rust/integration-tests/src/sdk_tests/mod.rs @@ -54,24 +54,6 @@ pub fn read_and_parse_json(path: &str) -> anyhow::Result { } } -#[allow(dead_code)] -pub fn get_transaction_version_from_test_context(test_context: &SdkTestContext) -> Vec { - test_context - .transaction_batches - .iter() - .map(|txn| txn.version) - .collect() -} - -#[allow(dead_code)] -pub fn get_all_version_from_test_context(test_context: &SdkTestContext) -> Vec { - test_context - .transaction_batches - .iter() - .map(|txn| txn.version as i64) - .collect() -} - // Common setup for database and test context #[allow(dead_code)] pub async fn setup_test_environment( @@ -80,7 +62,10 @@ pub async fn setup_test_environment( let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); - let test_context = SdkTestContext::new(transactions).await.unwrap(); + let mut test_context = SdkTestContext::new(transactions); + if test_context.init_mock_grpc().await.is_err() { + panic!("Failed to initialize mock grpc"); + }; (db, test_context) } @@ -144,7 +129,6 @@ pub async fn run_processor_test( processor: impl ProcessorTrait, load_data: F, db_url: String, - txn_versions: Vec, generate_file_flag: bool, output_path: String, custom_file_name: Option, @@ -155,32 +139,34 @@ where + Sync + 'static, { + let txn_versions: Vec = test_context + .get_test_transaction_versions() + .into_iter() + .map(|v| v as i64) + .collect(); + let db_values = test_context .run( &processor, - txn_versions[0] as u64, generate_file_flag, output_path.clone(), custom_file_name, move || { - let mut conn = - PgConnection::establish(&db_url).expect("Failed to establish DB connection"); - - let starting_version = txn_versions[0]; - let ending_version = txn_versions[txn_versions.len() - 1]; + let mut conn = PgConnection::establish(&db_url).unwrap_or_else(|e| { + eprintln!("[ERROR] Failed to establish DB connection: {:?}", e); + panic!("Failed to establish DB connection: {:?}", e); + }); - let db_values = match load_data(&mut conn, txn_versions) { + let db_values = match load_data(&mut conn, txn_versions.clone()) { Ok(db_data) => db_data, Err(e) => { - eprintln!( - "[ERROR] Failed to load data {}", e - ); + eprintln!("[ERROR] Failed to load data {}", e); return Err(e); }, }; if db_values.is_empty() { - eprintln!("[WARNING] No data found for starting txn version: {} and ending txn version {}", starting_version, ending_version); + eprintln!("[WARNING] No data found for versions: {:?}", txn_versions); } Ok(db_values) diff --git a/rust/integration-tests/src/sdk_tests/objects_processor_tests.rs b/rust/integration-tests/src/sdk_tests/objects_processor_tests.rs index 26517e0e5..fea1d3d25 100644 --- a/rust/integration-tests/src/sdk_tests/objects_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/objects_processor_tests.rs @@ -12,12 +12,9 @@ use std::collections::HashSet; pub fn setup_objects_processor_config( test_context: &SdkTestContext, - staring_version: u64, - txn_count: usize, db_url: &str, ) -> (IndexerProcessorConfig, &'static str) { - let transaction_stream_config = - test_context.create_transaction_stream_config(staring_version, txn_count as u64); + let transaction_stream_config = test_context.create_transaction_stream_config(); let postgres_config = PostgresConfig { connection_string: db_url.to_string(), db_pool_size: 100, @@ -57,7 +54,6 @@ mod sdk_objects_processor_tests { use crate::{ diff_test_helper::objects_processor::load_data, sdk_tests::{ - get_all_version_from_test_context, get_transaction_version_from_test_context, run_processor_test, setup_test_environment, validate_json, DEFAULT_OUTPUT_FOLDER, }, }; @@ -90,18 +86,9 @@ mod sdk_objects_processor_tests { let (db, mut test_context) = setup_test_environment(txns).await; - let starting_version = *get_transaction_version_from_test_context(&test_context) - .first() - .unwrap(); - let all_txn_versions = get_all_version_from_test_context(&test_context); - let db_url = db.get_db_url(); - let (indexer_processor_config, processor_name) = setup_objects_processor_config( - &test_context, - starting_version, - all_txn_versions.len(), - &db_url, - ); + let (indexer_processor_config, processor_name) = + setup_objects_processor_config(&test_context, &db_url); let objects_processor = ObjectsProcessor::new(indexer_processor_config) .await @@ -112,7 +99,6 @@ mod sdk_objects_processor_tests { objects_processor, load_data, db_url, - all_txn_versions, diff_flag, output_path.clone(), test_case_name.clone(), @@ -122,18 +108,18 @@ mod sdk_objects_processor_tests { Ok(mut db_value) => { let _ = validate_json( &mut db_value, - starting_version as u64, + test_context.get_request_start_version(), processor_name, output_path.clone(), test_case_name, ); }, Err(e) => { - eprintln!( - "[ERROR] Failed to run processor for txn version {}: {}", - 1, e + panic!( + "Test failed on transactions {:?} due to processor error: {}", + test_context.get_test_transaction_versions(), + e ); - panic!("Test failed due to processor error"); }, } } diff --git a/rust/integration-tests/src/sdk_tests/stake_processor_tests.rs b/rust/integration-tests/src/sdk_tests/stake_processor_tests.rs index 7722c4af3..10d5b89b5 100644 --- a/rust/integration-tests/src/sdk_tests/stake_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/stake_processor_tests.rs @@ -12,12 +12,9 @@ use std::collections::HashSet; pub async fn setup_stake_processor_config( test_context: &SdkTestContext, - staring_version: u64, - txn_count: usize, db_url: &str, ) -> (IndexerProcessorConfig, &'static str) { - let transaction_stream_config = - test_context.create_transaction_stream_config(staring_version, txn_count as u64); // since this will be always 1, we can remove from the arg list + let transaction_stream_config = test_context.create_transaction_stream_config(); // since this will be always 1, we can remove from the arg list let postgres_config = PostgresConfig { connection_string: db_url.to_string(), db_pool_size: 100, @@ -76,7 +73,6 @@ mod tests { async fn mainnet_stake_pool_delegation_txn() { process_single_mainnet_event_txn( IMPORTED_MAINNET_TXNS_1831971037_STAKE_DELEGATION_POOL, - 1831971037, Some("stake_pool_del_test".to_string()), ) .await; @@ -90,7 +86,6 @@ mod tests { async fn mainnet_stake_gov_record_txn() { process_single_mainnet_event_txn( IMPORTED_MAINNET_TXNS_1830706009_STAKER_GOVERNANCE_RECORD, - 1830706009, Some("stake_gov_record_test".to_string()), ) .await; @@ -104,7 +99,6 @@ mod tests { async fn mainnet_stake_processor_genesis_txn() { process_single_mainnet_event_txn( IMPORTED_MAINNET_TXNS_121508544_STAKE_DISTRIBUTE, - 121508544, Some("stake_distribute_test".to_string()), ) .await; @@ -117,7 +111,6 @@ mod tests { async fn mainnet_stake_processor_fa_metadata() { process_single_mainnet_event_txn( IMPORTED_MAINNET_TXNS_139449359_STAKE_REACTIVATE, - 139449359, Some("stake_reactivate_test".to_string()), ) .await; @@ -130,18 +123,13 @@ mod tests { async fn mainnet_stake_processor_fa_activities() { process_single_mainnet_event_txn( IMPORTED_MAINNET_TXNS_4827964_STAKE_INITIALIZE, - 4827964, Some("stake_initialize_test".to_string()), ) .await; } // Helper function to abstract out the single transaction processing - async fn process_single_mainnet_event_txn( - txn: &[u8], - txn_version: i64, - test_case_name: Option, - ) { + async fn process_single_mainnet_event_txn(txn: &[u8], test_case_name: Option) { let (diff_flag, custom_output_path) = get_test_config(); let output_path = custom_output_path .unwrap_or_else(|| format!("{}/imported_mainnet_txns", DEFAULT_OUTPUT_FOLDER)); @@ -150,7 +138,7 @@ mod tests { let db_url = db.get_db_url(); let (indexer_processor_config, processor_name) = - setup_stake_processor_config(&test_context, txn_version as u64, 1, &db_url).await; + setup_stake_processor_config(&test_context, &db_url).await; let stake_processor = StakeProcessor::new(indexer_processor_config) .await @@ -161,7 +149,6 @@ mod tests { stake_processor, load_data, db_url, - vec![txn_version], diff_flag, output_path.clone(), test_case_name.clone(), @@ -171,18 +158,18 @@ mod tests { Ok(mut db_value) => { let _ = validate_json( &mut db_value, - txn_version as u64, + test_context.get_request_start_version(), processor_name, output_path.clone(), test_case_name, ); }, Err(e) => { - eprintln!( - "[ERROR] Failed to run processor for txn version {}: {}", - 1, e + panic!( + "Test failed on transactions {:?} due to processor error: {}", + test_context.get_test_transaction_versions(), + e ); - panic!("Test failed due to processor error"); }, } } diff --git a/rust/integration-tests/src/sdk_tests/token_v2_processor_tests.rs b/rust/integration-tests/src/sdk_tests/token_v2_processor_tests.rs index bce76a7ab..de8c3e76e 100644 --- a/rust/integration-tests/src/sdk_tests/token_v2_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/token_v2_processor_tests.rs @@ -12,12 +12,9 @@ use std::collections::HashSet; pub fn setup_token_v2_processor_config( test_context: &SdkTestContext, - staring_version: u64, - txn_count: usize, db_url: &str, ) -> (IndexerProcessorConfig, &'static str) { - let transaction_stream_config = - test_context.create_transaction_stream_config(staring_version, txn_count as u64); + let transaction_stream_config = test_context.create_transaction_stream_config(); let postgres_config = PostgresConfig { connection_string: db_url.to_string(), db_pool_size: 100, @@ -56,8 +53,7 @@ mod sdk_token_v2_processor_tests { use crate::{ diff_test_helper::token_v2_processor::load_data, sdk_tests::{ - get_transaction_version_from_test_context, run_processor_test, setup_test_environment, - validate_json, DEFAULT_OUTPUT_FOLDER, + run_processor_test, setup_test_environment, validate_json, DEFAULT_OUTPUT_FOLDER, }, }; use aptos_indexer_test_transactions::{ @@ -341,12 +337,9 @@ mod sdk_token_v2_processor_tests { let (db, mut test_context) = setup_test_environment(&[txn]).await; - let txn_version = *get_transaction_version_from_test_context(&test_context) - .first() - .unwrap(); let db_url = db.get_db_url(); let (indexer_processor_config, processor_name) = - setup_token_v2_processor_config(&test_context, txn_version, 1, &db_url); + setup_token_v2_processor_config(&test_context, &db_url); let token_v2_processor = TokenV2Processor::new(indexer_processor_config) .await @@ -357,7 +350,6 @@ mod sdk_token_v2_processor_tests { token_v2_processor, load_data, db_url, - vec![txn_version as i64], diff_flag, output_path.clone(), test_case_name.clone(), @@ -367,18 +359,18 @@ mod sdk_token_v2_processor_tests { Ok(mut db_value) => { let _ = validate_json( &mut db_value, - txn_version as u64, + test_context.get_request_start_version(), processor_name, output_path.clone(), test_case_name, ); }, Err(e) => { - eprintln!( - "[ERROR] Failed to run processor for txn version {}: {}", - 1, e + panic!( + "Test failed on transactions {:?} due to processor error: {}", + test_context.get_test_transaction_versions(), + e ); - panic!("Test failed due to processor error"); }, } }