From aaffc262d26f166c7e178989c591c9bf8dc4f872 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 6 Nov 2023 15:12:06 +1300 Subject: [PATCH 01/11] refactor: Dont push to SQS during historical backfill --- .../src/historical_block_processing.rs | 61 +------------------ .../src/indexer_registry.rs | 2 +- 2 files changed, 3 insertions(+), 60 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 82b60b8c3..c881db932 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -1,15 +1,13 @@ -use crate::indexer_types::{IndexerFunction, IndexerQueueMessage}; +use crate::indexer_types::IndexerFunction; use crate::opts::{Opts, Parser}; -use crate::queue; use crate::s3; use anyhow::{bail, Context}; use aws_sdk_s3::Client as S3Client; use aws_sdk_s3::Config; -use aws_sdk_sqs::Client; use aws_types::SdkConfig; use chrono::{DateTime, LocalResult, TimeZone, Utc}; use indexer_rule_type::indexer_rule::MatchingRule; -use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatchPayload}; +use indexer_rules_engine::types::indexer_rule_match::ChainId; use near_jsonrpc_client::JsonRpcClient; use near_jsonrpc_primitives::types::blocks::RpcBlockRequest; use near_lake_framework::near_indexer_primitives::types::{BlockHeight, BlockId, BlockReference}; @@ -96,17 +94,12 @@ pub(crate) async fn process_historical_messages( ); let chain_id = opts.chain_id().clone(); - let aws_region = opts.aws_queue_region.clone(); - let queue_client = queue::queue_client(aws_region, opts.queue_credentials()); - let queue_url = opts.start_from_block_queue_url.clone(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); let json_rpc_client = JsonRpcClient::connect(opts.rpc_url()); let start_date = lookup_block_date_or_next_block_date(start_block, &json_rpc_client).await?; - let mut indexer_function = indexer_function.clone(); - let last_indexed_block = last_indexed_block_from_metadata(aws_config).await?; let last_indexed_block = last_indexed_block; @@ -138,8 +131,6 @@ pub(crate) async fn process_historical_messages( blocks_from_index.append(&mut blocks_between_indexed_and_current_block); - let first_block_in_index = *blocks_from_index.first().unwrap_or(&start_block); - if !blocks_from_index.is_empty() { storage::sadd( redis_connection_manager, @@ -163,18 +154,6 @@ pub(crate) async fn process_historical_messages( &[("block_height", current_block)], ) .await?; - - send_execution_message( - block_height, - first_block_in_index, - chain_id.clone(), - &queue_client, - queue_url.clone(), - &mut indexer_function, - current_block, - None, - ) - .await; } } } @@ -409,42 +388,6 @@ fn normalize_block_height(block_height: BlockHeight) -> String { format!("{:0>12}", block_height) } -async fn send_execution_message( - block_height: BlockHeight, - first_block: BlockHeight, - chain_id: ChainId, - queue_client: &Client, - queue_url: String, - indexer_function: &mut IndexerFunction, - current_block: u64, - payload: Option, -) { - // only request provisioning on the first block - if current_block != first_block { - indexer_function.provisioned = true; - } - - let msg = IndexerQueueMessage { - chain_id, - indexer_rule_id: 0, - indexer_rule_name: indexer_function.function_name.clone(), - payload, - block_height: current_block, - indexer_function: indexer_function.clone(), - is_historical: true, - }; - - match queue::send_to_indexer_queue(queue_client, queue_url, vec![msg]).await { - Ok(_) => {} - Err(err) => tracing::error!( - target: crate::INDEXER, - "#{} an error occurred when sending messages to the queue\n{:#?}", - block_height, - err - ), - } -} - // if block does not exist, try next block, up to MAX_RPC_BLOCKS_TO_PROCESS (20) blocks pub async fn lookup_block_date_or_next_block_date( block_height: u64, diff --git a/indexer/queryapi_coordinator/src/indexer_registry.rs b/indexer/queryapi_coordinator/src/indexer_registry.rs index 2c79cc70c..fa14ac49a 100644 --- a/indexer/queryapi_coordinator/src/indexer_registry.rs +++ b/indexer/queryapi_coordinator/src/indexer_registry.rs @@ -171,7 +171,7 @@ fn index_and_process_register_calls( if let Some(thread) = crate::historical_block_processing::spawn_historical_message_thread( block_height, - &mut new_indexer_function, + &new_indexer_function, context.redis_connection_manager, ) { From 7f0611f0df22e14a42d62801453fe410b12e6593 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 6 Nov 2023 15:26:46 +1300 Subject: [PATCH 02/11] refactor: Pass round `s3_client` rather than `s3_config` --- .../src/historical_block_processing.rs | 19 +++--- ...ical_block_processing_integration_tests.rs | 16 +++-- indexer/queryapi_coordinator/src/s3.rs | 62 ++++++++++--------- 3 files changed, 56 insertions(+), 41 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index c881db932..ebb3bbe18 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -96,17 +96,20 @@ pub(crate) async fn process_historical_messages( let chain_id = opts.chain_id().clone(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client: S3Client = S3Client::from_conf(s3_config); + let json_rpc_client = JsonRpcClient::connect(opts.rpc_url()); let start_date = lookup_block_date_or_next_block_date(start_block, &json_rpc_client).await?; - let last_indexed_block = last_indexed_block_from_metadata(aws_config).await?; + let last_indexed_block = last_indexed_block_from_metadata(&s3_client).await?; let last_indexed_block = last_indexed_block; let mut blocks_from_index = filter_matching_blocks_from_index_files( start_block, &indexer_function, - aws_config, + &s3_client, start_date, ) .await?; @@ -161,11 +164,9 @@ pub(crate) async fn process_historical_messages( } pub(crate) async fn last_indexed_block_from_metadata( - aws_config: &SdkConfig, + s3_client: &S3Client, ) -> anyhow::Result { let key = format!("{}/{}", INDEXED_ACTIONS_FILES_FOLDER, "latest_block.json"); - let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client: S3Client = S3Client::from_conf(s3_config); let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?; let metadata: serde_json::Value = serde_json::from_str(&metadata).unwrap(); @@ -186,7 +187,7 @@ pub(crate) async fn last_indexed_block_from_metadata( pub(crate) async fn filter_matching_blocks_from_index_files( start_block_height: BlockHeight, indexer_function: &IndexerFunction, - aws_config: &SdkConfig, + s3_client: &S3Client, start_date: DateTime, ) -> anyhow::Result> { let s3_bucket = INDEXED_DATA_FILES_BUCKET; @@ -203,7 +204,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files( needs_dedupe_and_sort = true; } s3::fetch_contract_index_files( - aws_config, + s3_client, s3_bucket, INDEXED_ACTIONS_FILES_FOLDER, start_date, @@ -310,7 +311,7 @@ async fn filter_matching_unindexed_blocks_from_lake( for current_block in (last_indexed_block + 1)..ending_block_height { // fetch block file from S3 let key = format!("{}/block.json", normalize_block_height(current_block)); - let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await; + let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, &s3_client).await; if s3_result.is_err() { let error = s3_result.err().unwrap(); @@ -341,7 +342,7 @@ async fn filter_matching_unindexed_blocks_from_lake( normalize_block_height(current_block), shard_id ); - let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await?; + let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, &s3_client).await?; match serde_json::from_slice::( shard.as_ref(), ) { diff --git a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs index 69ca67af9..857b08be2 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs @@ -38,9 +38,11 @@ mod tests { async fn test_indexing_metadata_file() { let opts = Opts::test_opts_with_aws(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let last_indexed_block = - historical_block_processing::last_indexed_block_from_metadata(aws_config) + historical_block_processing::last_indexed_block_from_metadata(&s3_client) .await .unwrap(); let a: Range = 90000000..9000000000; // valid for the next 300 years @@ -76,11 +78,13 @@ mod tests { let opts = Opts::test_opts_with_aws(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let redis_connection_manager = storage::connect(&opts.redis_connection_string) .await .unwrap(); let fake_block_height = - historical_block_processing::last_indexed_block_from_metadata(aws_config) + historical_block_processing::last_indexed_block_from_metadata(&s3_client) .await .unwrap(); let result = historical_block_processing::process_historical_messages( @@ -120,6 +124,8 @@ mod tests { let opts = Opts::test_opts_with_aws(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let start_block_height = 77016214; let naivedatetime_utc = NaiveDate::from_ymd_opt(2022, 10, 03) @@ -130,7 +136,7 @@ mod tests { let blocks = filter_matching_blocks_from_index_files( start_block_height, &indexer_function, - aws_config, + &s3_client, datetime_utc, ) .await; @@ -177,6 +183,8 @@ mod tests { let opts = Opts::test_opts_with_aws(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let start_block_height = 45894620; let naivedatetime_utc = NaiveDate::from_ymd_opt(2021, 08, 01) @@ -187,7 +195,7 @@ mod tests { let blocks = filter_matching_blocks_from_index_files( start_block_height, &indexer_function, - aws_config, + &s3_client, datetime_utc, ) .await; diff --git a/indexer/queryapi_coordinator/src/s3.rs b/indexer/queryapi_coordinator/src/s3.rs index f88323688..cf1defc67 100644 --- a/indexer/queryapi_coordinator/src/s3.rs +++ b/indexer/queryapi_coordinator/src/s3.rs @@ -1,7 +1,5 @@ use anyhow::{bail, Context, Result}; use aws_sdk_s3::Client as S3Client; -use aws_sdk_s3::Config; -use aws_types::SdkConfig; use chrono::{DateTime, NaiveDate, Utc}; use futures::future::try_join_all; @@ -17,7 +15,7 @@ fn storage_path_for_account(account: &str) -> String { } pub async fn find_index_files_by_pattern( - aws_config: &SdkConfig, + s3_client: &S3Client, s3_bucket: &str, s3_folder: &str, pattern: &str, @@ -29,10 +27,10 @@ pub async fn find_index_files_by_pattern( for account in account_array { let account = account.trim(); let sub_results = if account.contains('*') { - list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &account).await? + list_index_files_by_wildcard(s3_client, s3_bucket, s3_folder, &account).await? } else { list_s3_bucket_by_prefix( - aws_config, + s3_client, s3_bucket, &format!("{}/{}/", s3_folder, storage_path_for_account(account)), ) @@ -43,11 +41,11 @@ pub async fn find_index_files_by_pattern( results } x if x.contains('*') => { - list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &x).await? + list_index_files_by_wildcard(s3_client, s3_bucket, s3_folder, &x).await? } _ => { list_s3_bucket_by_prefix( - aws_config, + s3_client, s3_bucket, &format!("{}/{}/", s3_folder, storage_path_for_account(pattern),), ) @@ -57,7 +55,7 @@ pub async fn find_index_files_by_pattern( } async fn list_index_files_by_wildcard( - aws_config: &SdkConfig, + s3_client: &S3Client, s3_bucket: &str, s3_folder: &str, pattern: &&str, @@ -67,24 +65,20 @@ async fn list_index_files_by_wildcard( let path = storage_path_for_account(&pattern); let folders = - list_s3_bucket_by_prefix(aws_config, s3_bucket, &format!("{}/{}/", s3_folder, path)) - .await?; + list_s3_bucket_by_prefix(s3_client, s3_bucket, &format!("{}/{}/", s3_folder, path)).await?; // for each matching folder list files let mut results = vec![]; for folder in folders { - results.extend(list_s3_bucket_by_prefix(aws_config, s3_bucket, &folder).await?); + results.extend(list_s3_bucket_by_prefix(s3_client, s3_bucket, &folder).await?); } Ok(results) } async fn list_s3_bucket_by_prefix( - aws_config: &SdkConfig, + s3_client: &S3Client, s3_bucket: &str, s3_prefix: &str, ) -> Result> { - let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client: S3Client = S3Client::from_conf(s3_config); - let mut results = vec![]; let mut continuation_token: Option = None; @@ -126,18 +120,15 @@ async fn list_s3_bucket_by_prefix( } pub async fn fetch_contract_index_files( - aws_config: &SdkConfig, + s3_client: &S3Client, s3_bucket: &str, s3_folder: &str, start_date: DateTime, contract_pattern: &str, ) -> Result> { - let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client: S3Client = S3Client::from_conf(s3_config); - // list all index files let file_list = - find_index_files_by_pattern(aws_config, s3_bucket, s3_folder, contract_pattern).await?; + find_index_files_by_pattern(s3_client, s3_bucket, s3_folder, contract_pattern).await?; let fetch_and_parse_tasks = file_list .into_iter() @@ -146,7 +137,7 @@ pub async fn fetch_contract_index_files( let s3_client = s3_client.clone(); async move { // Fetch the file - fetch_text_file_from_s3(s3_bucket, key, s3_client).await + fetch_text_file_from_s3(s3_bucket, key, &s3_client).await } }) .collect::>(); @@ -162,7 +153,7 @@ pub async fn fetch_contract_index_files( pub async fn fetch_text_file_from_s3( s3_bucket: &str, key: String, - s3_client: S3Client, + s3_client: &S3Client, ) -> Result { // todo: can we retry if this fails like the lake s3_fetcher fn does? // If so, can we differentiate between a file not existing (block height does not exist) and a network error? @@ -216,9 +207,12 @@ mod tests { #[tokio::test] async fn list_delta_bucket() { let opts = Opts::test_opts_with_aws(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let list = list_s3_bucket_by_prefix( - &opts.lake_aws_sdk_config(), + &s3_client, INDEXED_DATA_FILES_BUCKET, &format!("{}/", INDEXED_ACTIONS_FILES_FOLDER.to_string()), ) @@ -231,9 +225,12 @@ mod tests { #[tokio::test] async fn list_with_single_contract() { let opts = Opts::test_opts_with_aws(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let list = find_index_files_by_pattern( - &opts.lake_aws_sdk_config(), + &s3_client, INDEXED_DATA_FILES_BUCKET, INDEXED_ACTIONS_FILES_FOLDER, "hackathon.agency.near", @@ -247,9 +244,12 @@ mod tests { #[tokio::test] async fn list_with_csv_contracts() { let opts = Opts::test_opts_with_aws(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let list = find_index_files_by_pattern( - &opts.lake_aws_sdk_config(), + &s3_client, INDEXED_DATA_FILES_BUCKET, INDEXED_ACTIONS_FILES_FOLDER, "hackathon.agency.near, hackathon.aurora-silo-dev.near, hackathon.sputnik-dao.near", @@ -263,9 +263,12 @@ mod tests { #[tokio::test] async fn list_with_wildcard_contracts() { let opts = Opts::test_opts_with_aws(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let list = find_index_files_by_pattern( - &opts.lake_aws_sdk_config(), + &s3_client, INDEXED_DATA_FILES_BUCKET, INDEXED_ACTIONS_FILES_FOLDER, "*.keypom.near", @@ -279,9 +282,12 @@ mod tests { #[tokio::test] async fn list_with_csv_and_wildcard_contracts() { let opts = Opts::test_opts_with_aws(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let list = find_index_files_by_pattern( - &opts.lake_aws_sdk_config(), + &s3_client, INDEXED_DATA_FILES_BUCKET, INDEXED_ACTIONS_FILES_FOLDER, "*.keypom.near, hackathon.agency.near, *.nearcrowd.near", @@ -322,7 +328,7 @@ mod tests { let s3_result = fetch_text_file_from_s3( format!("{}{}", LAKE_BUCKET_PREFIX, "mainnet").as_str(), "does_not_exist/block.json".to_string(), - s3_client, + &s3_client, ) .await; From bd8577f42d28e2a5f3e84c52b86dbb31b14b88d6 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 6 Nov 2023 15:49:01 +1300 Subject: [PATCH 03/11] refactor: Instantiate singleton `s3_client` --- .../src/historical_block_processing.rs | 41 ++++++++++--------- .../src/indexer_registry.rs | 2 + indexer/queryapi_coordinator/src/main.rs | 6 +++ 3 files changed, 30 insertions(+), 19 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index ebb3bbe18..234563293 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -3,8 +3,6 @@ use crate::opts::{Opts, Parser}; use crate::s3; use anyhow::{bail, Context}; use aws_sdk_s3::Client as S3Client; -use aws_sdk_s3::Config; -use aws_types::SdkConfig; use chrono::{DateTime, LocalResult, TimeZone, Utc}; use indexer_rule_type::indexer_rule::MatchingRule; use indexer_rules_engine::types::indexer_rule_match::ChainId; @@ -24,8 +22,13 @@ pub fn spawn_historical_message_thread( block_height: BlockHeight, new_indexer_function: &IndexerFunction, redis_connection_manager: &storage::ConnectionManager, + s3_client: &S3Client, + chain_id: &ChainId, ) -> Option> { let redis_connection_manager = redis_connection_manager.clone(); + let s3_client = s3_client.clone(); + let chain_id = chain_id.clone(); + new_indexer_function.start_block_height.map(|_| { let new_indexer_function_copy = new_indexer_function.clone(); tokio::spawn(async move { @@ -34,6 +37,8 @@ pub fn spawn_historical_message_thread( new_indexer_function_copy, Opts::parse(), &redis_connection_manager, + &s3_client, + &chain_id, ) .await }) @@ -45,12 +50,16 @@ pub(crate) async fn process_historical_messages_or_handle_error( indexer_function: IndexerFunction, opts: Opts, redis_connection_manager: &storage::ConnectionManager, + s3_client: &S3Client, + chain_id: &ChainId, ) -> i64 { match process_historical_messages( block_height, indexer_function, opts, redis_connection_manager, + s3_client, + chain_id, ) .await { @@ -71,6 +80,8 @@ pub(crate) async fn process_historical_messages( indexer_function: IndexerFunction, opts: Opts, redis_connection_manager: &storage::ConnectionManager, + s3_client: &S3Client, + chain_id: &ChainId, ) -> anyhow::Result { let start_block = indexer_function.start_block_height.unwrap(); let block_difference: i64 = (block_height - start_block) as i64; @@ -93,23 +104,17 @@ pub(crate) async fn process_historical_messages( indexer_function.function_name ); - let chain_id = opts.chain_id().clone(); - let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); - - let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client: S3Client = S3Client::from_conf(s3_config); - let json_rpc_client = JsonRpcClient::connect(opts.rpc_url()); let start_date = lookup_block_date_or_next_block_date(start_block, &json_rpc_client).await?; - let last_indexed_block = last_indexed_block_from_metadata(&s3_client).await?; + let last_indexed_block = last_indexed_block_from_metadata(s3_client).await?; let last_indexed_block = last_indexed_block; let mut blocks_from_index = filter_matching_blocks_from_index_files( start_block, &indexer_function, - &s3_client, + s3_client, start_date, ) .await?; @@ -127,8 +132,8 @@ pub(crate) async fn process_historical_messages( last_indexed_block, block_height, &indexer_function, - aws_config, - chain_id.clone(), + s3_client, + chain_id, ) .await?; @@ -284,12 +289,10 @@ async fn filter_matching_unindexed_blocks_from_lake( last_indexed_block: BlockHeight, ending_block_height: BlockHeight, indexer_function: &IndexerFunction, - aws_config: &SdkConfig, - chain_id: ChainId, + s3_client: &S3Client, + chain_id: &ChainId, ) -> anyhow::Result> { - let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client: S3Client = S3Client::from_conf(s3_config); - let lake_bucket = lake_bucket_for_chain(chain_id.clone()); + let lake_bucket = lake_bucket_for_chain(chain_id); let indexer_rule = &indexer_function.indexer_rule; let count = ending_block_height - last_indexed_block; @@ -311,7 +314,7 @@ async fn filter_matching_unindexed_blocks_from_lake( for current_block in (last_indexed_block + 1)..ending_block_height { // fetch block file from S3 let key = format!("{}/block.json", normalize_block_height(current_block)); - let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, &s3_client).await; + let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await; if s3_result.is_err() { let error = s3_result.err().unwrap(); @@ -381,7 +384,7 @@ async fn filter_matching_unindexed_blocks_from_lake( Ok(blocks_to_process) } -fn lake_bucket_for_chain(chain_id: ChainId) -> String { +fn lake_bucket_for_chain(chain_id: &ChainId) -> String { format!("{}{}", LAKE_BUCKET_PREFIX, chain_id) } diff --git a/indexer/queryapi_coordinator/src/indexer_registry.rs b/indexer/queryapi_coordinator/src/indexer_registry.rs index fa14ac49a..30561eb8c 100644 --- a/indexer/queryapi_coordinator/src/indexer_registry.rs +++ b/indexer/queryapi_coordinator/src/indexer_registry.rs @@ -173,6 +173,8 @@ fn index_and_process_register_calls( block_height, &new_indexer_function, context.redis_connection_manager, + context.s3_client, + context.chain_id, ) { spawned_start_from_block_threads.push(thread); diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index a67f5653a..a2988d176 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -42,6 +42,7 @@ pub(crate) struct QueryApiContext<'a> { pub streamer_message: near_lake_framework::near_indexer_primitives::StreamerMessage, pub chain_id: &'a ChainId, pub queue_client: &'a queue::QueueClient, + pub s3_client: &'a aws_sdk_s3::Client, pub queue_url: &'a str, pub registry_contract_id: &'a str, pub balance_cache: &'a BalanceCache, @@ -62,6 +63,10 @@ async fn main() -> anyhow::Result<()> { let queue_url = opts.queue_url.clone(); let registry_contract_id = opts.registry_contract_id.clone(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + // We want to prevent unnecessary RPC queries to find previous balance let balances_cache: BalanceCache = std::sync::Arc::new(Mutex::new(SizedCache::with_size(100_000))); @@ -106,6 +111,7 @@ async fn main() -> anyhow::Result<()> { streamer_message, chain_id, queue_client: &queue_client, + s3_client: &s3_client, }; handle_streamer_message(context, indexer_registry.clone()) }) From ef2573a0259efbd27677185792cbc675ee63eedd Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 6 Nov 2023 15:53:55 +1300 Subject: [PATCH 04/11] refactor: Instantiate singleton `json_rpc_client` --- .../src/historical_block_processing.rs | 9 +++++++-- .../src/historical_block_processing_integration_tests.rs | 7 +++++++ indexer/queryapi_coordinator/src/indexer_registry.rs | 1 + indexer/queryapi_coordinator/src/main.rs | 2 ++ 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 234563293..cae2a7793 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -24,10 +24,12 @@ pub fn spawn_historical_message_thread( redis_connection_manager: &storage::ConnectionManager, s3_client: &S3Client, chain_id: &ChainId, + json_rpc_client: &JsonRpcClient, ) -> Option> { let redis_connection_manager = redis_connection_manager.clone(); let s3_client = s3_client.clone(); let chain_id = chain_id.clone(); + let json_rpc_client = json_rpc_client.clone(); new_indexer_function.start_block_height.map(|_| { let new_indexer_function_copy = new_indexer_function.clone(); @@ -39,6 +41,7 @@ pub fn spawn_historical_message_thread( &redis_connection_manager, &s3_client, &chain_id, + &json_rpc_client, ) .await }) @@ -52,6 +55,7 @@ pub(crate) async fn process_historical_messages_or_handle_error( redis_connection_manager: &storage::ConnectionManager, s3_client: &S3Client, chain_id: &ChainId, + json_rpc_client: &JsonRpcClient, ) -> i64 { match process_historical_messages( block_height, @@ -60,6 +64,7 @@ pub(crate) async fn process_historical_messages_or_handle_error( redis_connection_manager, s3_client, chain_id, + json_rpc_client, ) .await { @@ -82,6 +87,7 @@ pub(crate) async fn process_historical_messages( redis_connection_manager: &storage::ConnectionManager, s3_client: &S3Client, chain_id: &ChainId, + json_rpc_client: &JsonRpcClient, ) -> anyhow::Result { let start_block = indexer_function.start_block_height.unwrap(); let block_difference: i64 = (block_height - start_block) as i64; @@ -104,9 +110,8 @@ pub(crate) async fn process_historical_messages( indexer_function.function_name ); - let json_rpc_client = JsonRpcClient::connect(opts.rpc_url()); let start_date = - lookup_block_date_or_next_block_date(start_block, &json_rpc_client).await?; + lookup_block_date_or_next_block_date(start_block, json_rpc_client).await?; let last_indexed_block = last_indexed_block_from_metadata(s3_client).await?; let last_indexed_block = last_indexed_block; diff --git a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs index 857b08be2..31310d720 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs @@ -77,12 +77,17 @@ mod tests { }; let opts = Opts::test_opts_with_aws(); + let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let redis_connection_manager = storage::connect(&opts.redis_connection_string) .await .unwrap(); + + let json_rpc_client = near_jsonrpc_client::JsonRpcClient::connect(opts.rpc_url()); + let fake_block_height = historical_block_processing::last_indexed_block_from_metadata(&s3_client) .await @@ -92,6 +97,8 @@ mod tests { indexer_function, opts, &redis_connection_manager, + &s3_client, + &json_rpc_client, ) .await; assert!(result.unwrap() > 0); diff --git a/indexer/queryapi_coordinator/src/indexer_registry.rs b/indexer/queryapi_coordinator/src/indexer_registry.rs index 30561eb8c..73aca2d0b 100644 --- a/indexer/queryapi_coordinator/src/indexer_registry.rs +++ b/indexer/queryapi_coordinator/src/indexer_registry.rs @@ -175,6 +175,7 @@ fn index_and_process_register_calls( context.redis_connection_manager, context.s3_client, context.chain_id, + context.json_rpc_client, ) { spawned_start_from_block_threads.push(thread); diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index a2988d176..da011ec9e 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -43,6 +43,7 @@ pub(crate) struct QueryApiContext<'a> { pub chain_id: &'a ChainId, pub queue_client: &'a queue::QueueClient, pub s3_client: &'a aws_sdk_s3::Client, + pub json_rpc_client: &'a JsonRpcClient, pub queue_url: &'a str, pub registry_contract_id: &'a str, pub balance_cache: &'a BalanceCache, @@ -111,6 +112,7 @@ async fn main() -> anyhow::Result<()> { streamer_message, chain_id, queue_client: &queue_client, + json_rpc_client: &json_rpc_client, s3_client: &s3_client, }; handle_streamer_message(context, indexer_registry.clone()) From 099220da82a1149f8f3b310a7a9181e08686b5e0 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 6 Nov 2023 16:09:01 +1300 Subject: [PATCH 05/11] refactor: Remove `Opts` reference from historical backfill --- .../queryapi_coordinator/src/historical_block_processing.rs | 5 ----- .../src/historical_block_processing_integration_tests.rs | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index cae2a7793..ddbefae19 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -1,5 +1,4 @@ use crate::indexer_types::IndexerFunction; -use crate::opts::{Opts, Parser}; use crate::s3; use anyhow::{bail, Context}; use aws_sdk_s3::Client as S3Client; @@ -37,7 +36,6 @@ pub fn spawn_historical_message_thread( process_historical_messages_or_handle_error( block_height, new_indexer_function_copy, - Opts::parse(), &redis_connection_manager, &s3_client, &chain_id, @@ -51,7 +49,6 @@ pub fn spawn_historical_message_thread( pub(crate) async fn process_historical_messages_or_handle_error( block_height: BlockHeight, indexer_function: IndexerFunction, - opts: Opts, redis_connection_manager: &storage::ConnectionManager, s3_client: &S3Client, chain_id: &ChainId, @@ -60,7 +57,6 @@ pub(crate) async fn process_historical_messages_or_handle_error( match process_historical_messages( block_height, indexer_function, - opts, redis_connection_manager, s3_client, chain_id, @@ -83,7 +79,6 @@ pub(crate) async fn process_historical_messages_or_handle_error( pub(crate) async fn process_historical_messages( block_height: BlockHeight, indexer_function: IndexerFunction, - opts: Opts, redis_connection_manager: &storage::ConnectionManager, s3_client: &S3Client, chain_id: &ChainId, diff --git a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs index 31310d720..f9b94185f 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs @@ -95,9 +95,9 @@ mod tests { let result = historical_block_processing::process_historical_messages( fake_block_height + 1, indexer_function, - opts, &redis_connection_manager, &s3_client, + &opts.chain_id(), &json_rpc_client, ) .await; From bb0ae0a8359fe3c27720da242b1abf04c90da1a8 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 7 Nov 2023 09:01:16 +1300 Subject: [PATCH 06/11] fix: Needless borrow --- indexer/queryapi_coordinator/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index da011ec9e..1d18d948f 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -211,7 +211,7 @@ async fn handle_streamer_message( indexer_function_messages.push(msg); if !indexer_function.provisioned { - set_provisioned_flag(&mut indexer_registry_locked, &indexer_function); + set_provisioned_flag(&mut indexer_registry_locked, indexer_function); } storage::sadd( From a4437ed6b433756a91ba6f5347d42df8f41c0eff Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 7 Nov 2023 09:27:00 +1300 Subject: [PATCH 07/11] refactor: Remove real-time SQS messages --- indexer/queryapi_coordinator/src/main.rs | 39 +----------------------- 1 file changed, 1 insertion(+), 38 deletions(-) diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index 1d18d948f..d086649ad 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -185,9 +185,7 @@ async fn handle_streamer_message( let indexer_function = indexer_function_with_matches.indexer_function; let indexer_rule_matches = indexer_function_with_matches.matches; - let mut indexer_function_messages: Vec = Vec::new(); - - for indexer_rule_match in indexer_rule_matches.iter() { + for _ in indexer_rule_matches.iter() { tracing::debug!( target: INDEXER, "Matched filter {:?} for function {} {}", @@ -196,20 +194,6 @@ async fn handle_streamer_message( indexer_function.function_name, ); - let msg = IndexerQueueMessage { - chain_id: indexer_rule_match.chain_id.clone(), - indexer_rule_id: indexer_rule_match.indexer_rule_id.unwrap_or(0), - indexer_rule_name: indexer_rule_match - .indexer_rule_name - .clone() - .unwrap_or("".to_string()), - payload: Some(indexer_rule_match.payload.clone()), - block_height, - indexer_function: indexer_function.clone(), - is_historical: false, - }; - indexer_function_messages.push(msg); - if !indexer_function.provisioned { set_provisioned_flag(&mut indexer_registry_locked, indexer_function); } @@ -234,27 +218,6 @@ async fn handle_streamer_message( ) .await?; } - - stream::iter(indexer_function_messages.into_iter()) - .chunks(10) - .for_each(|indexer_queue_messages_batch| async { - match queue::send_to_indexer_queue( - context.queue_client, - context.queue_url.to_string(), - indexer_queue_messages_batch, - ) - .await - { - Ok(_) => {} - Err(err) => tracing::error!( - target: INDEXER, - "#{} an error occurred during sending messages to the queue\n{:#?}", - context.streamer_message.block.header.height, - err - ), - } - }) - .await; } } From 36bbb9aadffeee051d4cca3f7ed4a9cec644a0cb Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 7 Nov 2023 09:27:37 +1300 Subject: [PATCH 08/11] refactor: Remove SQS references from coordinator --- README.md | 3 +- indexer/Cargo.lock | 28 ------ indexer/README.md | 5 - indexer/queryapi_coordinator/Cargo.toml | 1 - indexer/queryapi_coordinator/README.md | 1 - ...ical_block_processing_integration_tests.rs | 5 - .../queryapi_coordinator/src/indexer_types.rs | 19 ---- indexer/queryapi_coordinator/src/main.rs | 91 +++++++++---------- indexer/queryapi_coordinator/src/opts.rs | 27 ------ indexer/queryapi_coordinator/src/queue.rs | 73 --------------- 10 files changed, 45 insertions(+), 208 deletions(-) delete mode 100644 indexer/queryapi_coordinator/src/queue.rs diff --git a/README.md b/README.md index 7112dd31d..6092f692b 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ With QueryApi you can ## 🧩 Components 1. [QueryApi Coordinator](./indexer) An Indexer that tracks changes to the QueryApi registry contract. It triggers the execution of those IndexerFunctions -when they match new blocks by placing messages on an SQS queue. Spawns historical processing threads when needed. +when they match new blocks by placing messages on a Redis Stream. Spawns historical processing threads when needed. 1.a. Subfolders provide crates for the different components of the Indexer: indexer_rule_type (shared with registry contract), indexer_rules_engine, storage. 2. [Indexer Runner](.indexer-js-queue-handler) @@ -70,7 +70,6 @@ docker compose up ### Local Configuration - Coordinator watches the dev registry contract by default (`dev-queryapi.dataplatform.near`). To use a different contract, you can update the `REGISTRY_CONTRACT_ID` environment variable. -- Coodinator will log SQS messages rather than sending them. To use an actual Queue, you can update the `QUEUE_URL` and `START_FROM_BLOCK_QUEUE_URL` environment variables. ### Known Issues diff --git a/indexer/Cargo.lock b/indexer/Cargo.lock index dede1fcb7..07f1f23a7 100644 --- a/indexer/Cargo.lock +++ b/indexer/Cargo.lock @@ -507,33 +507,6 @@ dependencies = [ "url", ] -[[package]] -name = "aws-sdk-sqs" -version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffe1f563e227905539d5d1514e93a4c4e096366e1325ab24646783a3d6fe2c45" -dependencies = [ - "aws-credential-types", - "aws-endpoint", - "aws-http", - "aws-sig-auth", - "aws-smithy-async", - "aws-smithy-client", - "aws-smithy-http", - "aws-smithy-http-tower", - "aws-smithy-json", - "aws-smithy-query", - "aws-smithy-types", - "aws-smithy-xml", - "aws-types", - "bytes", - "http", - "regex", - "tokio-stream", - "tower", - "url", -] - [[package]] name = "aws-sdk-sso" version = "0.23.0" @@ -3353,7 +3326,6 @@ dependencies = [ "anyhow", "aws-credential-types", "aws-sdk-s3", - "aws-sdk-sqs", "aws-types", "base64 0.13.1", "borsh 0.10.3", diff --git a/indexer/README.md b/indexer/README.md index 4c4c99890..948a6e0f5 100644 --- a/indexer/README.md +++ b/indexer/README.md @@ -30,8 +30,6 @@ This project is using `workspace` feature of Cargo. Some tests require blocks with matching data. To download the test block, run `./download_test_blocks.sh 93085141`. Some other useful blocks are 80854399 92476362 93085141 93659695. -To log a message instead of sending SQS messages set your `QUEUE_URL` to `MOCK` in your `.env` file. - ## Design concept Identified major types of the events on the network: @@ -46,9 +44,6 @@ Identified major types of the events on the network: DATABASE_URL=postgres://user:pass@host/database LAKE_AWS_ACCESS_KEY=AKI_LAKE_ACCESS... LAKE_AWS_SECRET_ACCESS_KEY=LAKE_SECRET... -QUEUE_AWS_ACCESS_KEY=AKI_SQS_ACCESS... -QUEUE_AWS_SECRET_ACCESS_KEY=SQS_ACCESS_SECRET -QUEUE_URL=https://sqs.eu-central-1.amazonaws.com/754641474505/alertexer-queue ``` ## Running locally diff --git a/indexer/queryapi_coordinator/Cargo.toml b/indexer/queryapi_coordinator/Cargo.toml index 01c1c99d4..a32e48b9f 100644 --- a/indexer/queryapi_coordinator/Cargo.toml +++ b/indexer/queryapi_coordinator/Cargo.toml @@ -41,4 +41,3 @@ unescape = "0.1.0" aws-types = "0.53.0" aws-credential-types = "0.53.0" aws-sdk-s3 = "0.23.0" -aws-sdk-sqs = "0.23.0" diff --git a/indexer/queryapi_coordinator/README.md b/indexer/queryapi_coordinator/README.md index a267f4ab2..9cf8bd47f 100644 --- a/indexer/queryapi_coordinator/README.md +++ b/indexer/queryapi_coordinator/README.md @@ -17,4 +17,3 @@ see terraform scripts https://github.com/near/near-ops/tree/master/provisioning/ This app requires: * a connection to a database containing "alert" rules to match blocks against; * a redis server where identifiers of processed blocks are stored; - * a SQS queue to write to. \ No newline at end of file diff --git a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs index f9b94185f..bb73bbe2a 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs @@ -20,11 +20,6 @@ mod tests { redis_connection_string: env::var("REDIS_CONNECTION_STRING").unwrap(), lake_aws_access_key, lake_aws_secret_access_key, - queue_aws_access_key: "".to_string(), - queue_aws_secret_access_key: "".to_string(), - aws_queue_region: "".to_string(), - queue_url: "MOCK".to_string(), - start_from_block_queue_url: "MOCK".to_string(), registry_contract_id: "".to_string(), port: 0, chain_id: ChainId::Mainnet(StartOptions::FromLatest), diff --git a/indexer/queryapi_coordinator/src/indexer_types.rs b/indexer/queryapi_coordinator/src/indexer_types.rs index f3880320b..ea075866f 100644 --- a/indexer/queryapi_coordinator/src/indexer_types.rs +++ b/indexer/queryapi_coordinator/src/indexer_types.rs @@ -1,28 +1,9 @@ use indexer_rule_type::indexer_rule::IndexerRule; -use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatchPayload}; use near_lake_framework::near_indexer_primitives::types::AccountId; use std::collections::HashMap; pub type IndexerRegistry = HashMap>; -#[derive( - borsh::BorshSerialize, - borsh::BorshDeserialize, - serde::Serialize, - serde::Deserialize, - Clone, - Debug, -)] -pub struct IndexerQueueMessage { - pub chain_id: ChainId, - pub indexer_rule_id: u32, - pub indexer_rule_name: String, - pub payload: Option, - pub block_height: u64, - pub indexer_function: IndexerFunction, - pub is_historical: bool, -} - #[derive( borsh::BorshSerialize, borsh::BorshDeserialize, diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index d086649ad..5e6b11231 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -9,7 +9,7 @@ use near_lake_framework::near_indexer_primitives::{types, StreamerMessage}; use utils::serialize_to_camel_case_json_string; use crate::indexer_types::IndexerFunction; -use indexer_types::{IndexerQueueMessage, IndexerRegistry}; +use indexer_types::IndexerRegistry; use opts::{Opts, Parser}; use storage::{self, generate_real_time_streamer_message_key, ConnectionManager}; @@ -19,7 +19,6 @@ mod indexer_registry; mod indexer_types; mod metrics; mod opts; -mod queue; mod s3; mod utils; @@ -41,10 +40,8 @@ pub type BalanceCache = std::sync::Arc { pub streamer_message: near_lake_framework::near_indexer_primitives::StreamerMessage, pub chain_id: &'a ChainId, - pub queue_client: &'a queue::QueueClient, pub s3_client: &'a aws_sdk_s3::Client, pub json_rpc_client: &'a JsonRpcClient, - pub queue_url: &'a str, pub registry_contract_id: &'a str, pub balance_cache: &'a BalanceCache, pub redis_connection_manager: &'a ConnectionManager, @@ -59,9 +56,6 @@ async fn main() -> anyhow::Result<()> { let opts = Opts::parse(); let chain_id = &opts.chain_id(); - let aws_region = opts.aws_queue_region.clone(); - let queue_client = queue::queue_client(aws_region, opts.queue_credentials()); - let queue_url = opts.queue_url.clone(); let registry_contract_id = opts.registry_contract_id.clone(); let aws_config = &opts.lake_aws_sdk_config(); @@ -106,12 +100,10 @@ async fn main() -> anyhow::Result<()> { .map(|streamer_message| { let context = QueryApiContext { redis_connection_manager: &redis_connection_manager, - queue_url: &queue_url, balance_cache: &balances_cache, registry_contract_id: ®istry_contract_id, streamer_message, chain_id, - queue_client: &queue_client, json_rpc_client: &json_rpc_client, s3_client: &s3_client, }; @@ -303,48 +295,53 @@ async fn reduce_rule_matches_for_indexer_function<'x>( #[cfg(test)] mod historical_block_processing_integration_tests; -use indexer_rule_type::indexer_rule::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; -use std::collections::HashMap; - -#[tokio::test] -async fn set_provisioning_finds_functions_in_registry() { - let mut indexer_registry = IndexerRegistry::new(); - let indexer_function = IndexerFunction { - account_id: "test_near".to_string().parse().unwrap(), - function_name: "test_indexer".to_string(), - code: "".to_string(), - start_block_height: None, - schema: None, - provisioned: false, - indexer_rule: IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - id: None, - name: None, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "social.near".to_string(), - status: Status::Success, +#[cfg(test)] +mod tests { + use super::*; + use indexer_rule_type::indexer_rule::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; + use std::collections::HashMap; + + #[tokio::test] + async fn set_provisioning_finds_functions_in_registry() { + let mut indexer_registry = IndexerRegistry::new(); + let indexer_function = IndexerFunction { + account_id: "test_near".to_string().parse().unwrap(), + function_name: "test_indexer".to_string(), + code: "".to_string(), + start_block_height: None, + schema: None, + provisioned: false, + indexer_rule: IndexerRule { + indexer_rule_kind: IndexerRuleKind::Action, + id: None, + name: None, + matching_rule: MatchingRule::ActionAny { + affected_account_id: "social.near".to_string(), + status: Status::Success, + }, }, - }, - }; + }; - let mut functions: HashMap = HashMap::new(); - functions.insert( - indexer_function.function_name.clone(), - indexer_function.clone(), - ); - indexer_registry.insert(indexer_function.account_id.clone(), functions); + let mut functions: HashMap = HashMap::new(); + functions.insert( + indexer_function.function_name.clone(), + indexer_function.clone(), + ); + indexer_registry.insert(indexer_function.account_id.clone(), functions); - let indexer_registry: SharedIndexerRegistry = std::sync::Arc::new(Mutex::new(indexer_registry)); - let mut indexer_registry_locked = indexer_registry.lock().await; + let indexer_registry: SharedIndexerRegistry = + std::sync::Arc::new(Mutex::new(indexer_registry)); + let mut indexer_registry_locked = indexer_registry.lock().await; - set_provisioned_flag(&mut indexer_registry_locked, &&indexer_function); + set_provisioned_flag(&mut indexer_registry_locked, &indexer_function); - let account_functions = indexer_registry_locked - .get(&indexer_function.account_id) - .unwrap(); - let indexer_function = account_functions - .get(&indexer_function.function_name) - .unwrap(); + let account_functions = indexer_registry_locked + .get(&indexer_function.account_id) + .unwrap(); + let indexer_function = account_functions + .get(&indexer_function.function_name) + .unwrap(); - assert!(indexer_function.provisioned); + assert!(indexer_function.provisioned); + } } diff --git a/indexer/queryapi_coordinator/src/opts.rs b/indexer/queryapi_coordinator/src/opts.rs index af0fd4a11..b06394e75 100644 --- a/indexer/queryapi_coordinator/src/opts.rs +++ b/indexer/queryapi_coordinator/src/opts.rs @@ -26,21 +26,6 @@ pub struct Opts { #[clap(long, env)] /// AWS Secret Access Key with the rights to read from AWS S3 pub lake_aws_secret_access_key: String, - /// AWS Access Key with the rights to send messages to the `--queue-url` - #[clap(long, env)] - pub queue_aws_access_key: String, - /// AWS Secret Access Key with the rights to send messages to the `--queue-url` - #[clap(long, env)] - pub queue_aws_secret_access_key: String, - /// Which AWS region to use with the `--queue-url` - #[clap(long, env)] - pub aws_queue_region: String, - /// URL to the main AWS SQS queue backed by Queue Handler lambda - #[clap(long, env)] - pub queue_url: String, - /// URL to the AWS SQS queue for processing historical data - #[clap(long, env)] - pub start_from_block_queue_url: String, /// Registry contract to use #[clap(env)] pub registry_contract_id: String, @@ -98,18 +83,6 @@ impl Opts { aws_credential_types::provider::SharedCredentialsProvider::new(provider) } - // Creates AWS Credentials for SQS Queue - pub fn queue_credentials(&self) -> aws_credential_types::provider::SharedCredentialsProvider { - let provider = aws_credential_types::Credentials::new( - self.queue_aws_access_key.clone(), - self.queue_aws_secret_access_key.clone(), - None, - None, - "queryapi_coordinator_queue", - ); - aws_credential_types::provider::SharedCredentialsProvider::new(provider) - } - /// Creates AWS Shared Config for NEAR Lake pub fn lake_aws_sdk_config(&self) -> aws_types::sdk_config::SdkConfig { aws_types::sdk_config::SdkConfig::builder() diff --git a/indexer/queryapi_coordinator/src/queue.rs b/indexer/queryapi_coordinator/src/queue.rs deleted file mode 100644 index 1ff6fa5cb..000000000 --- a/indexer/queryapi_coordinator/src/queue.rs +++ /dev/null @@ -1,73 +0,0 @@ -use crate::indexer_types::IndexerQueueMessage; -use aws_credential_types::provider::SharedCredentialsProvider; -pub use aws_sdk_sqs::{ - error::SendMessageError, model::SendMessageBatchRequestEntry, Client as QueueClient, Region, -}; - -pub const MOCK_QUEUE_URL: &str = "MOCK"; - -/// Creates AWS SQS Client for QueryApi SQS -pub fn queue_client(region: String, credentials: SharedCredentialsProvider) -> aws_sdk_sqs::Client { - let shared_config = queue_aws_sdk_config(region, credentials); - aws_sdk_sqs::Client::new(&shared_config) -} - -/// Creates AWS Shared Config for QueryApi SQS queue -pub fn queue_aws_sdk_config( - region: String, - credentials: SharedCredentialsProvider, -) -> aws_types::sdk_config::SdkConfig { - aws_types::sdk_config::SdkConfig::builder() - .credentials_provider(credentials) - .region(aws_types::region::Region::new(region)) - .build() -} - -pub async fn send_to_indexer_queue( - client: &aws_sdk_sqs::Client, - queue_url: String, - indexer_queue_messages: Vec, -) -> anyhow::Result<()> { - if queue_url == MOCK_QUEUE_URL { - for m in &indexer_queue_messages { - tracing::info!( - "Mock sending messages to SQS: {:?} {:?}", - m.indexer_function.function_name, - m.block_height - ); - } - return Ok(()); - } - - let message_bodies: Vec = indexer_queue_messages - .into_iter() - .enumerate() - .map(|(index, indexer_queue_message)| { - SendMessageBatchRequestEntry::builder() - .id(index.to_string()) - .message_body( - serde_json::to_string(&indexer_queue_message) - .expect("Failed to Json Serialize IndexerQueueMessage"), - ) - .message_group_id(format!( - "{}_{}", - indexer_queue_message.indexer_function.account_id, - indexer_queue_message.indexer_function.function_name - )) - .build() - }) - .collect(); - - let rsp = client - .send_message_batch() - .queue_url(queue_url) - .set_entries(Some(message_bodies)) - .send() - .await?; - tracing::debug!( - target: crate::INDEXER, - "Response from sending a message to SQS\n{:#?}", - rsp - ); - Ok(()) -} From 1bf6839097933562653987dab19d66a79e681db1 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 7 Nov 2023 09:34:08 +1300 Subject: [PATCH 09/11] fix: Clippy errors --- .../src/historical_block_processing.rs | 4 ++-- ...ical_block_processing_integration_tests.rs | 6 ++--- indexer/queryapi_coordinator/src/main.rs | 22 ++----------------- indexer/queryapi_coordinator/src/opts.rs | 1 + indexer/queryapi_coordinator/src/s3.rs | 7 ++++-- indexer/queryapi_coordinator/src/utils.rs | 6 ++--- 6 files changed, 16 insertions(+), 30 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index ddbefae19..f745c3071 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -318,7 +318,7 @@ async fn filter_matching_unindexed_blocks_from_lake( if s3_result.is_err() { let error = s3_result.err().unwrap(); - if let Some(_) = error.downcast_ref::() { + if error.downcast_ref::().is_some() { tracing::info!( target: crate::INDEXER, "In manual filtering, skipping block number {} which was not found. For function {:?} {:?}", @@ -345,7 +345,7 @@ async fn filter_matching_unindexed_blocks_from_lake( normalize_block_height(current_block), shard_id ); - let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, &s3_client).await?; + let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await?; match serde_json::from_slice::( shard.as_ref(), ) { diff --git a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs index bb73bbe2a..86c281bb5 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs @@ -130,7 +130,7 @@ mod tests { let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let start_block_height = 77016214; - let naivedatetime_utc = NaiveDate::from_ymd_opt(2022, 10, 03) + let naivedatetime_utc = NaiveDate::from_ymd_opt(2022, 10, 3) .unwrap() .and_hms_opt(0, 0, 0) .unwrap(); @@ -153,7 +153,7 @@ mod tests { } Err(e) => { println!("Error: {:?}", e); - assert!(false); + panic!(); } } } @@ -189,7 +189,7 @@ mod tests { let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let start_block_height = 45894620; - let naivedatetime_utc = NaiveDate::from_ymd_opt(2021, 08, 01) + let naivedatetime_utc = NaiveDate::from_ymd_opt(2021, 8, 1) .unwrap() .and_hms_opt(0, 0, 0) .unwrap(); diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index 5e6b11231..8c5cf09b9 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -1,11 +1,10 @@ -use cached::SizedCache; use futures::stream::{self, StreamExt}; use near_jsonrpc_client::JsonRpcClient; use tokio::sync::{Mutex, MutexGuard}; use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatch}; -use near_lake_framework::near_indexer_primitives::types::{AccountId, BlockHeight}; -use near_lake_framework::near_indexer_primitives::{types, StreamerMessage}; +use near_lake_framework::near_indexer_primitives::types::BlockHeight; +use near_lake_framework::near_indexer_primitives::StreamerMessage; use utils::serialize_to_camel_case_json_string; use crate::indexer_types::IndexerFunction; @@ -23,27 +22,15 @@ mod s3; mod utils; pub(crate) const INDEXER: &str = "queryapi_coordinator"; -pub(crate) const INTERVAL: std::time::Duration = std::time::Duration::from_millis(100); -pub(crate) const MAX_DELAY_TIME: std::time::Duration = std::time::Duration::from_millis(4000); -pub(crate) const RETRY_COUNT: usize = 2; type SharedIndexerRegistry = std::sync::Arc>; -#[derive(Debug, Default, Clone, Copy)] -pub struct BalanceDetails { - pub non_staked: types::Balance, - pub staked: types::Balance, -} - -pub type BalanceCache = std::sync::Arc>>; - pub(crate) struct QueryApiContext<'a> { pub streamer_message: near_lake_framework::near_indexer_primitives::StreamerMessage, pub chain_id: &'a ChainId, pub s3_client: &'a aws_sdk_s3::Client, pub json_rpc_client: &'a JsonRpcClient, pub registry_contract_id: &'a str, - pub balance_cache: &'a BalanceCache, pub redis_connection_manager: &'a ConnectionManager, } @@ -62,10 +49,6 @@ async fn main() -> anyhow::Result<()> { let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); let s3_client = aws_sdk_s3::Client::from_conf(s3_config); - // We want to prevent unnecessary RPC queries to find previous balance - let balances_cache: BalanceCache = - std::sync::Arc::new(Mutex::new(SizedCache::with_size(100_000))); - tracing::info!(target: INDEXER, "Connecting to redis..."); let redis_connection_manager = storage::connect(&opts.redis_connection_string).await?; @@ -100,7 +83,6 @@ async fn main() -> anyhow::Result<()> { .map(|streamer_message| { let context = QueryApiContext { redis_connection_manager: &redis_connection_manager, - balance_cache: &balances_cache, registry_contract_id: ®istry_contract_id, streamer_message, chain_id, diff --git a/indexer/queryapi_coordinator/src/opts.rs b/indexer/queryapi_coordinator/src/opts.rs index b06394e75..b99d580c8 100644 --- a/indexer/queryapi_coordinator/src/opts.rs +++ b/indexer/queryapi_coordinator/src/opts.rs @@ -46,6 +46,7 @@ pub enum ChainId { } #[derive(Subcommand, Debug, Clone)] +#[allow(clippy::enum_variant_names)] pub enum StartOptions { FromBlock { height: u64 }, FromInterruption, diff --git a/indexer/queryapi_coordinator/src/s3.rs b/indexer/queryapi_coordinator/src/s3.rs index cf1defc67..9b7e8696b 100644 --- a/indexer/queryapi_coordinator/src/s3.rs +++ b/indexer/queryapi_coordinator/src/s3.rs @@ -214,7 +214,7 @@ mod tests { let list = list_s3_bucket_by_prefix( &s3_client, INDEXED_DATA_FILES_BUCKET, - &format!("{}/", INDEXED_ACTIONS_FILES_FOLDER.to_string()), + &format!("{}/", INDEXED_ACTIONS_FILES_FOLDER), ) .await .unwrap(); @@ -335,7 +335,10 @@ mod tests { if s3_result.is_err() { let wrapped_error = s3_result.err().unwrap(); let error = wrapped_error.root_cause(); - if let Some(_) = error.downcast_ref::() { + if error + .downcast_ref::() + .is_some() + { success = true; } else { println!("Failed to downcast error: {:?}", error); diff --git a/indexer/queryapi_coordinator/src/utils.rs b/indexer/queryapi_coordinator/src/utils.rs index 9f8c2585a..27830f9a3 100644 --- a/indexer/queryapi_coordinator/src/utils.rs +++ b/indexer/queryapi_coordinator/src/utils.rs @@ -64,7 +64,7 @@ pub(crate) fn serialize_to_camel_case_json_string( // Convert keys to Camel Case to_camel_case_keys(&mut message_value); - return serde_json::to_string(&message_value); + serde_json::to_string(&message_value) } fn to_camel_case_keys(message_value: &mut Value) { @@ -74,13 +74,13 @@ fn to_camel_case_keys(message_value: &mut Value) { for key in map.keys().cloned().collect::>() { // Generate Camel Case Key let new_key = key - .split("_") + .split('_') .enumerate() .map(|(i, str)| { if i > 0 { return str[..1].to_uppercase() + &str[1..]; } - return str.to_owned(); + str.to_owned() }) .collect::>() .join(""); From 6a079cd6df8d524c7acf41455325595bbfde1658 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 7 Nov 2023 09:53:20 +1300 Subject: [PATCH 10/11] fix: Cargo format --- .../queryapi_coordinator/src/historical_block_processing.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index f745c3071..eae0484bc 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -318,7 +318,10 @@ async fn filter_matching_unindexed_blocks_from_lake( if s3_result.is_err() { let error = s3_result.err().unwrap(); - if error.downcast_ref::().is_some() { + if error + .downcast_ref::() + .is_some() + { tracing::info!( target: crate::INDEXER, "In manual filtering, skipping block number {} which was not found. For function {:?} {:?}", From 4ad76005a376cc73f42ea96c8b052208677e183c Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 7 Nov 2023 09:55:25 +1300 Subject: [PATCH 11/11] fix: Clippy --- indexer/indexer_rules_engine/src/matcher.rs | 4 ++-- .../queryapi_coordinator/src/historical_block_processing.rs | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/indexer/indexer_rules_engine/src/matcher.rs b/indexer/indexer_rules_engine/src/matcher.rs index 662e51161..864422db1 100644 --- a/indexer/indexer_rules_engine/src/matcher.rs +++ b/indexer/indexer_rules_engine/src/matcher.rs @@ -115,8 +115,8 @@ fn match_account( outcome_with_receipt: &IndexerExecutionOutcomeWithReceipt, ) -> bool { match account_id { - x if x.contains(",") => x - .split(",") + x if x.contains(',') => x + .split(',') .any(|sub_account_id| match_account(sub_account_id.trim(), outcome_with_receipt)), _ => { wildmatch::WildMatch::new(account_id).matches(&outcome_with_receipt.receipt.receiver_id) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index eae0484bc..73c0ab6d0 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -109,7 +109,6 @@ pub(crate) async fn process_historical_messages( lookup_block_date_or_next_block_date(start_block, json_rpc_client).await?; let last_indexed_block = last_indexed_block_from_metadata(s3_client).await?; - let last_indexed_block = last_indexed_block; let mut blocks_from_index = filter_matching_blocks_from_index_files( start_block,