-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Remove SQS references from Coordinator #365
Changes from all commits
aaffc26
7f0611f
bd8577f
ef2573a
099220d
bb0ae0a
a4437ed
36bbb9a
1bf6839
6a079cd
4ad7600
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,10 @@ | ||
use crate::indexer_types::{IndexerFunction, IndexerQueueMessage}; | ||
use crate::opts::{Opts, Parser}; | ||
use crate::queue; | ||
use crate::indexer_types::IndexerFunction; | ||
use crate::s3; | ||
use anyhow::{bail, Context}; | ||
use aws_sdk_s3::Client as S3Client; | ||
use aws_sdk_s3::Config; | ||
use aws_sdk_sqs::Client; | ||
use aws_types::SdkConfig; | ||
use chrono::{DateTime, LocalResult, TimeZone, Utc}; | ||
use indexer_rule_type::indexer_rule::MatchingRule; | ||
use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatchPayload}; | ||
use indexer_rules_engine::types::indexer_rule_match::ChainId; | ||
use near_jsonrpc_client::JsonRpcClient; | ||
use near_jsonrpc_primitives::types::blocks::RpcBlockRequest; | ||
use near_lake_framework::near_indexer_primitives::types::{BlockHeight, BlockId, BlockReference}; | ||
|
@@ -26,16 +21,25 @@ pub fn spawn_historical_message_thread( | |
block_height: BlockHeight, | ||
new_indexer_function: &IndexerFunction, | ||
redis_connection_manager: &storage::ConnectionManager, | ||
s3_client: &S3Client, | ||
chain_id: &ChainId, | ||
json_rpc_client: &JsonRpcClient, | ||
) -> Option<JoinHandle<i64>> { | ||
let redis_connection_manager = redis_connection_manager.clone(); | ||
let s3_client = s3_client.clone(); | ||
let chain_id = chain_id.clone(); | ||
let json_rpc_client = json_rpc_client.clone(); | ||
|
||
new_indexer_function.start_block_height.map(|_| { | ||
let new_indexer_function_copy = new_indexer_function.clone(); | ||
tokio::spawn(async move { | ||
process_historical_messages_or_handle_error( | ||
block_height, | ||
new_indexer_function_copy, | ||
Opts::parse(), | ||
&redis_connection_manager, | ||
&s3_client, | ||
&chain_id, | ||
&json_rpc_client, | ||
) | ||
.await | ||
}) | ||
|
@@ -45,14 +49,18 @@ pub fn spawn_historical_message_thread( | |
pub(crate) async fn process_historical_messages_or_handle_error( | ||
block_height: BlockHeight, | ||
indexer_function: IndexerFunction, | ||
opts: Opts, | ||
redis_connection_manager: &storage::ConnectionManager, | ||
s3_client: &S3Client, | ||
chain_id: &ChainId, | ||
json_rpc_client: &JsonRpcClient, | ||
) -> i64 { | ||
match process_historical_messages( | ||
block_height, | ||
indexer_function, | ||
opts, | ||
redis_connection_manager, | ||
s3_client, | ||
chain_id, | ||
json_rpc_client, | ||
) | ||
.await | ||
{ | ||
|
@@ -71,8 +79,10 @@ pub(crate) async fn process_historical_messages_or_handle_error( | |
pub(crate) async fn process_historical_messages( | ||
block_height: BlockHeight, | ||
indexer_function: IndexerFunction, | ||
opts: Opts, | ||
redis_connection_manager: &storage::ConnectionManager, | ||
s3_client: &S3Client, | ||
chain_id: &ChainId, | ||
json_rpc_client: &JsonRpcClient, | ||
) -> anyhow::Result<i64> { | ||
let start_block = indexer_function.start_block_height.unwrap(); | ||
let block_difference: i64 = (block_height - start_block) as i64; | ||
|
@@ -95,25 +105,15 @@ pub(crate) async fn process_historical_messages( | |
indexer_function.function_name | ||
); | ||
|
||
let chain_id = opts.chain_id().clone(); | ||
let aws_region = opts.aws_queue_region.clone(); | ||
let queue_client = queue::queue_client(aws_region, opts.queue_credentials()); | ||
let queue_url = opts.start_from_block_queue_url.clone(); | ||
let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); | ||
|
||
let json_rpc_client = JsonRpcClient::connect(opts.rpc_url()); | ||
let start_date = | ||
lookup_block_date_or_next_block_date(start_block, &json_rpc_client).await?; | ||
lookup_block_date_or_next_block_date(start_block, json_rpc_client).await?; | ||
|
||
let mut indexer_function = indexer_function.clone(); | ||
|
||
let last_indexed_block = last_indexed_block_from_metadata(aws_config).await?; | ||
let last_indexed_block = last_indexed_block; | ||
let last_indexed_block = last_indexed_block_from_metadata(s3_client).await?; | ||
|
||
let mut blocks_from_index = filter_matching_blocks_from_index_files( | ||
start_block, | ||
&indexer_function, | ||
aws_config, | ||
s3_client, | ||
start_date, | ||
) | ||
.await?; | ||
|
@@ -131,15 +131,13 @@ pub(crate) async fn process_historical_messages( | |
last_indexed_block, | ||
block_height, | ||
&indexer_function, | ||
aws_config, | ||
chain_id.clone(), | ||
s3_client, | ||
chain_id, | ||
) | ||
.await?; | ||
|
||
blocks_from_index.append(&mut blocks_between_indexed_and_current_block); | ||
|
||
let first_block_in_index = *blocks_from_index.first().unwrap_or(&start_block); | ||
|
||
if !blocks_from_index.is_empty() { | ||
storage::sadd( | ||
redis_connection_manager, | ||
|
@@ -163,30 +161,16 @@ pub(crate) async fn process_historical_messages( | |
&[("block_height", current_block)], | ||
) | ||
.await?; | ||
|
||
send_execution_message( | ||
block_height, | ||
first_block_in_index, | ||
chain_id.clone(), | ||
&queue_client, | ||
queue_url.clone(), | ||
&mut indexer_function, | ||
current_block, | ||
None, | ||
) | ||
.await; | ||
} | ||
} | ||
} | ||
Ok(block_difference) | ||
} | ||
|
||
pub(crate) async fn last_indexed_block_from_metadata( | ||
aws_config: &SdkConfig, | ||
s3_client: &S3Client, | ||
) -> anyhow::Result<BlockHeight> { | ||
let key = format!("{}/{}", INDEXED_ACTIONS_FILES_FOLDER, "latest_block.json"); | ||
let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); | ||
let s3_client: S3Client = S3Client::from_conf(s3_config); | ||
let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?; | ||
|
||
let metadata: serde_json::Value = serde_json::from_str(&metadata).unwrap(); | ||
|
@@ -207,7 +191,7 @@ pub(crate) async fn last_indexed_block_from_metadata( | |
pub(crate) async fn filter_matching_blocks_from_index_files( | ||
start_block_height: BlockHeight, | ||
indexer_function: &IndexerFunction, | ||
aws_config: &SdkConfig, | ||
s3_client: &S3Client, | ||
start_date: DateTime<Utc>, | ||
) -> anyhow::Result<Vec<BlockHeight>> { | ||
let s3_bucket = INDEXED_DATA_FILES_BUCKET; | ||
|
@@ -224,7 +208,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files( | |
needs_dedupe_and_sort = true; | ||
} | ||
s3::fetch_contract_index_files( | ||
aws_config, | ||
s3_client, | ||
s3_bucket, | ||
INDEXED_ACTIONS_FILES_FOLDER, | ||
start_date, | ||
|
@@ -304,12 +288,10 @@ async fn filter_matching_unindexed_blocks_from_lake( | |
last_indexed_block: BlockHeight, | ||
ending_block_height: BlockHeight, | ||
indexer_function: &IndexerFunction, | ||
aws_config: &SdkConfig, | ||
chain_id: ChainId, | ||
s3_client: &S3Client, | ||
chain_id: &ChainId, | ||
) -> anyhow::Result<Vec<u64>> { | ||
let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); | ||
let s3_client: S3Client = S3Client::from_conf(s3_config); | ||
let lake_bucket = lake_bucket_for_chain(chain_id.clone()); | ||
let lake_bucket = lake_bucket_for_chain(chain_id); | ||
|
||
let indexer_rule = &indexer_function.indexer_rule; | ||
let count = ending_block_height - last_indexed_block; | ||
|
@@ -331,11 +313,14 @@ async fn filter_matching_unindexed_blocks_from_lake( | |
for current_block in (last_indexed_block + 1)..ending_block_height { | ||
// fetch block file from S3 | ||
let key = format!("{}/block.json", normalize_block_height(current_block)); | ||
let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await; | ||
let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await; | ||
|
||
if s3_result.is_err() { | ||
let error = s3_result.err().unwrap(); | ||
if let Some(_) = error.downcast_ref::<aws_sdk_s3::error::NoSuchKey>() { | ||
if error | ||
.downcast_ref::<aws_sdk_s3::error::NoSuchKey>() | ||
.is_some() | ||
{ | ||
Comment on lines
+320
to
+323
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this code changed to catch more errors? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nah it's just a clippy suggestion - it behaves the same |
||
tracing::info!( | ||
target: crate::INDEXER, | ||
"In manual filtering, skipping block number {} which was not found. For function {:?} {:?}", | ||
|
@@ -362,7 +347,7 @@ async fn filter_matching_unindexed_blocks_from_lake( | |
normalize_block_height(current_block), | ||
shard_id | ||
); | ||
let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await?; | ||
let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await?; | ||
match serde_json::from_slice::<near_lake_framework::near_indexer_primitives::IndexerShard>( | ||
shard.as_ref(), | ||
) { | ||
|
@@ -401,50 +386,14 @@ async fn filter_matching_unindexed_blocks_from_lake( | |
Ok(blocks_to_process) | ||
} | ||
|
||
fn lake_bucket_for_chain(chain_id: ChainId) -> String { | ||
fn lake_bucket_for_chain(chain_id: &ChainId) -> String { | ||
format!("{}{}", LAKE_BUCKET_PREFIX, chain_id) | ||
} | ||
|
||
fn normalize_block_height(block_height: BlockHeight) -> String { | ||
format!("{:0>12}", block_height) | ||
} | ||
|
||
async fn send_execution_message( | ||
block_height: BlockHeight, | ||
first_block: BlockHeight, | ||
chain_id: ChainId, | ||
queue_client: &Client, | ||
queue_url: String, | ||
indexer_function: &mut IndexerFunction, | ||
current_block: u64, | ||
payload: Option<IndexerRuleMatchPayload>, | ||
) { | ||
// only request provisioning on the first block | ||
if current_block != first_block { | ||
indexer_function.provisioned = true; | ||
} | ||
|
||
let msg = IndexerQueueMessage { | ||
chain_id, | ||
indexer_rule_id: 0, | ||
indexer_rule_name: indexer_function.function_name.clone(), | ||
payload, | ||
block_height: current_block, | ||
indexer_function: indexer_function.clone(), | ||
is_historical: true, | ||
}; | ||
|
||
match queue::send_to_indexer_queue(queue_client, queue_url, vec![msg]).await { | ||
Ok(_) => {} | ||
Err(err) => tracing::error!( | ||
target: crate::INDEXER, | ||
"#{} an error occurred when sending messages to the queue\n{:#?}", | ||
block_height, | ||
err | ||
), | ||
} | ||
} | ||
|
||
// if block does not exist, try next block, up to MAX_RPC_BLOCKS_TO_PROCESS (20) blocks | ||
pub async fn lookup_block_date_or_next_block_date( | ||
block_height: u64, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we clone these instead of using the referenced client directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tokio::spawn
requires, and takes, ownership of the variables referenced, this is signified by themove
keyword. The compiler does not know how long the async task (tokio::spawn
) will last, and therefore cannot determine whether the reference will live long enough.Cloning allows us to take ownership of the variable, which then means we can move it to the async task.