From 48d779449caa5b10a51a6319c61992af0edb52ce Mon Sep 17 00:00:00 2001 From: bowenyang007 Date: Wed, 20 Mar 2024 20:12:27 -0700 Subject: [PATCH] Merge v1.10 into main (#328) * optimize checking if fungible token logic (#320) * [indexer] Make query retries and delay into parameters (#321) * optimize checking if fungible token logic * move query retries and query cooldown between retries to parameters * do not wait if we're done retrying * lint * lint --- rust/processor/src/config.rs | 11 +++ .../src/models/object_models/v2_objects.rs | 43 ++++---- .../stake_models/current_delegated_voter.rs | 44 ++++++--- .../models/stake_models/delegator_balances.rs | 49 +++++++--- .../models/token_models/collection_datas.rs | 28 ++++-- .../src/models/token_models/tokens.rs | 4 + .../models/token_v2_models/v2_collections.rs | 43 +++++--- .../token_v2_models/v2_token_activities.rs | 2 +- .../models/token_v2_models/v2_token_datas.rs | 98 +++++-------------- .../token_v2_models/v2_token_ownerships.rs | 26 ++--- .../processor/src/processors/ans_processor.rs | 4 +- rust/processor/src/processors/mod.rs | 12 +-- .../src/processors/nft_metadata_processor.rs | 22 ++++- .../src/processors/objects_processor.rs | 22 ++++- .../src/processors/stake_processor.rs | 27 ++++- .../src/processors/token_processor.rs | 18 +++- .../src/processors/token_v2_processor.rs | 36 ++++++- rust/processor/src/worker.rs | 26 +++-- 18 files changed, 329 insertions(+), 186 deletions(-) diff --git a/rust/processor/src/config.rs b/rust/processor/src/config.rs index 35a5f8cb6..63c59e94f 100644 --- a/rust/processor/src/config.rs +++ b/rust/processor/src/config.rs @@ -12,6 +12,9 @@ use server_framework::RunnableConfig; use std::time::Duration; use url::Url; +pub const QUERY_DEFAULT_RETRIES: u32 = 5; +pub const QUERY_DEFAULT_RETRY_DELAY_MS: u64 = 500; + #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] pub struct IndexerGrpcProcessorConfig { @@ -49,6 +52,14 @@ impl IndexerGrpcProcessorConfig { DEFAULT_GAP_DETECTION_BATCH_SIZE } + pub const fn default_query_retries() -> u32 { + QUERY_DEFAULT_RETRIES + } + + pub const fn default_query_retry_delay_ms() -> u64 { + QUERY_DEFAULT_RETRY_DELAY_MS + } + /// Make the default very large on purpose so that by default it's not chunked /// This prevents any unexpected changes in behavior pub const fn default_pb_channel_txn_chunk_size() -> usize { diff --git a/rust/processor/src/models/object_models/v2_objects.rs b/rust/processor/src/models/object_models/v2_objects.rs index ad1fd605d..31381e157 100644 --- a/rust/processor/src/models/object_models/v2_objects.rs +++ b/rust/processor/src/models/object_models/v2_objects.rs @@ -7,10 +7,7 @@ use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping}; use crate::{ - models::{ - default_models::move_resources::MoveResource, - token_models::collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, - }, + models::default_models::move_resources::MoveResource, schema::{current_objects, objects}, utils::{database::PgPoolConnection, util::standardize_address}, }; @@ -21,7 +18,6 @@ use diesel::prelude::*; use diesel_async::RunQueryDsl; use field_count::FieldCount; use serde::{Deserialize, Serialize}; -use tracing::warn; #[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(transaction_version, write_set_change_index))] @@ -111,6 +107,8 @@ impl Object { write_set_change_index: i64, object_mapping: &AHashMap, conn: &mut PgPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, ) -> anyhow::Result> { if delete_resource.type_str == "0x1::object::ObjectGroup" { let resource = MoveResource::from_delete_resource( @@ -122,7 +120,14 @@ impl Object { let previous_object = if let Some(object) = object_mapping.get(&resource.address) { object.clone() } else { - match Self::get_current_object(conn, &resource.address, txn_version).await { + match Self::get_current_object( + conn, + &resource.address, + query_retries, + query_retry_delay_ms, + ) + .await + { Ok(object) => object, Err(_) => { tracing::error!( @@ -166,11 +171,12 @@ impl Object { pub async fn get_current_object( conn: &mut PgPoolConnection<'_>, object_address: &str, - transaction_version: i64, + query_retries: u32, + query_retry_delay_ms: u64, ) -> anyhow::Result { - let mut retries = 0; - while retries < QUERY_RETRIES { - retries += 1; + let mut tried = 0; + while tried < query_retries { + tried += 1; match CurrentObjectQuery::get_by_address(object_address, conn).await { Ok(res) => { return Ok(CurrentObject { @@ -183,18 +189,11 @@ impl Object { is_deleted: res.is_deleted, }); }, - Err(e) => { - warn!( - transaction_version, - error = ?e, - object_address, - retry_ms = QUERY_RETRY_DELAY_MS, - "Failed to get object from current_objects table for object_address: {}, retrying in {} ms. ", - object_address, - QUERY_RETRY_DELAY_MS, - ); - tokio::time::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS)) - .await; + Err(_) => { + if tried < query_retries { + tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms)) + .await; + } }, } } diff --git a/rust/processor/src/models/stake_models/current_delegated_voter.rs b/rust/processor/src/models/stake_models/current_delegated_voter.rs index 772216eb1..8915d83dd 100644 --- a/rust/processor/src/models/stake_models/current_delegated_voter.rs +++ b/rust/processor/src/models/stake_models/current_delegated_voter.rs @@ -9,7 +9,6 @@ use super::{ stake_utils::VoteDelegationTableItem, }; use crate::{ - models::token_models::collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, schema::current_delegated_voter, utils::{database::PgPoolConnection, util::standardize_address}, }; @@ -77,6 +76,8 @@ impl CurrentDelegatedVoter { txn_timestamp: chrono::NaiveDateTime, vote_delegation_handle_to_pool_address: &VoteDelegationTableHandleToPool, conn: &mut PgPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, ) -> anyhow::Result { let mut delegated_voter_map: CurrentDelegatedVoterMap = AHashMap::new(); @@ -93,7 +94,7 @@ impl CurrentDelegatedVoter { Some(pool_address) => pool_address.clone(), None => { // look up from db - Self::get_delegation_pool_address_by_table_handle(conn, &table_handle).await + Self::get_delegation_pool_address_by_table_handle(conn, &table_handle, query_retries, query_retry_delay_ms).await .unwrap_or_else(|_| { tracing::error!( transaction_version = txn_version, @@ -137,6 +138,8 @@ impl CurrentDelegatedVoter { active_pool_to_staking_pool: &ShareToStakingPoolMapping, previous_delegated_voters: &CurrentDelegatedVoterMap, conn: &mut PgPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, ) -> anyhow::Result> { if let Some((_, active_balance)) = CurrentDelegatorBalance::get_active_share_from_write_table_item( @@ -156,7 +159,14 @@ impl CurrentDelegatedVoter { Some(_) => true, None => { // look up from db - Self::get_existence_by_pk(conn, &delegator_address, &pool_address).await + Self::get_existence_by_pk( + conn, + &delegator_address, + &pool_address, + query_retries, + query_retry_delay_ms, + ) + .await }, }; if !already_exists { @@ -177,17 +187,21 @@ impl CurrentDelegatedVoter { pub async fn get_delegation_pool_address_by_table_handle( conn: &mut PgPoolConnection<'_>, table_handle: &str, + query_retries: u32, + query_retry_delay_ms: u64, ) -> anyhow::Result { - let mut retried = 0; - while retried < QUERY_RETRIES { - retried += 1; + let mut tried = 0; + while tried < query_retries { + tried += 1; match CurrentDelegatedVoterQuery::get_by_table_handle(conn, table_handle).await { Ok(current_delegated_voter_query_result) => { return Ok(current_delegated_voter_query_result.delegation_pool_address); }, Err(_) => { - tokio::time::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS)) - .await; + if tried < query_retries { + tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms)) + .await; + } }, } } @@ -200,10 +214,12 @@ impl CurrentDelegatedVoter { conn: &mut PgPoolConnection<'_>, delegator_address: &str, delegation_pool_address: &str, + query_retries: u32, + query_retry_delay_ms: u64, ) -> bool { - let mut retried = 0; - while retried < QUERY_RETRIES { - retried += 1; + let mut tried = 0; + while tried < query_retries { + tried += 1; match CurrentDelegatedVoterQuery::get_by_pk( conn, delegator_address, @@ -213,8 +229,10 @@ impl CurrentDelegatedVoter { { Ok(_) => return true, Err(_) => { - tokio::time::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS)) - .await; + if tried < query_retries { + tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms)) + .await; + } }, } } diff --git a/rust/processor/src/models/stake_models/delegator_balances.rs b/rust/processor/src/models/stake_models/delegator_balances.rs index 916c3d7ff..0293574a6 100644 --- a/rust/processor/src/models/stake_models/delegator_balances.rs +++ b/rust/processor/src/models/stake_models/delegator_balances.rs @@ -5,10 +5,7 @@ use super::delegator_pools::{DelegatorPool, DelegatorPoolBalanceMetadata, PoolBalanceMetadata}; use crate::{ - models::{ - default_models::move_tables::TableItem, - token_models::collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, - }, + models::default_models::move_tables::TableItem, schema::{current_delegator_balances, delegator_balances}, utils::{database::PgPoolConnection, util::standardize_address}, }; @@ -137,6 +134,8 @@ impl CurrentDelegatorBalance { inactive_pool_to_staking_pool: &ShareToStakingPoolMapping, inactive_share_to_pool: &ShareToPoolMapping, conn: &mut PgPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, ) -> anyhow::Result> { let table_handle = standardize_address(&write_table_item.handle.to_string()); // The mapping will tell us if the table item belongs to an inactive pool @@ -153,6 +152,8 @@ impl CurrentDelegatorBalance { match Self::get_staking_pool_from_inactive_share_handle( conn, &inactive_pool_handle, + query_retries, + query_retry_delay_ms, ) .await { @@ -257,6 +258,8 @@ impl CurrentDelegatorBalance { inactive_pool_to_staking_pool: &ShareToStakingPoolMapping, inactive_share_to_pool: &ShareToPoolMapping, conn: &mut PgPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, ) -> anyhow::Result> { let table_handle = standardize_address(&delete_table_item.handle.to_string()); // The mapping will tell us if the table item belongs to an inactive pool @@ -269,13 +272,17 @@ impl CurrentDelegatorBalance { .map(|metadata| metadata.staking_pool_address.clone()) { Some(pool_address) => pool_address, - None => { - Self::get_staking_pool_from_inactive_share_handle(conn, &inactive_pool_handle) - .await - .context(format!("Failed to get staking pool address from inactive share handle {}, txn version {}", - inactive_pool_handle, txn_version - ))? - } + None => Self::get_staking_pool_from_inactive_share_handle( + conn, + &inactive_pool_handle, + query_retries, + query_retry_delay_ms, + ) + .await + .context(format!( + "Failed to get staking pool from inactive share handle {}, txn version {}", + inactive_pool_handle, txn_version + ))?, }; let delegator_address = standardize_address(&delete_table_item.key.to_string()); @@ -364,17 +371,21 @@ impl CurrentDelegatorBalance { pub async fn get_staking_pool_from_inactive_share_handle( conn: &mut PgPoolConnection<'_>, table_handle: &str, + query_retries: u32, + query_retry_delay_ms: u64, ) -> anyhow::Result { - let mut retried = 0; - while retried < QUERY_RETRIES { - retried += 1; + let mut tried = 0; + while tried < query_retries { + tried += 1; match CurrentDelegatorBalanceQuery::get_by_inactive_share_handle(conn, table_handle) .await { Ok(current_delegator_balance) => return Ok(current_delegator_balance.pool_address), Err(_) => { - tokio::time::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS)) - .await; + if tried < query_retries { + tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms)) + .await; + } }, } } @@ -387,6 +398,8 @@ impl CurrentDelegatorBalance { transaction: &Transaction, active_pool_to_staking_pool: &ShareToStakingPoolMapping, conn: &mut PgPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, ) -> anyhow::Result<(Vec, CurrentDelegatorBalanceMap)> { let mut inactive_pool_to_staking_pool: ShareToStakingPoolMapping = AHashMap::new(); let mut inactive_share_to_pool: ShareToPoolMapping = AHashMap::new(); @@ -436,6 +449,8 @@ impl CurrentDelegatorBalance { &inactive_pool_to_staking_pool, &inactive_share_to_pool, conn, + query_retries, + query_retry_delay_ms, ) .await .unwrap() @@ -461,6 +476,8 @@ impl CurrentDelegatorBalance { &inactive_pool_to_staking_pool, &inactive_share_to_pool, conn, + query_retries, + query_retry_delay_ms, ) .await .unwrap() diff --git a/rust/processor/src/models/token_models/collection_datas.rs b/rust/processor/src/models/token_models/collection_datas.rs index 5ba8d7fd6..57444dac4 100644 --- a/rust/processor/src/models/token_models/collection_datas.rs +++ b/rust/processor/src/models/token_models/collection_datas.rs @@ -20,9 +20,6 @@ use diesel_async::RunQueryDsl; use field_count::FieldCount; use serde::{Deserialize, Serialize}; -pub const QUERY_RETRIES: u32 = 5; -pub const QUERY_RETRY_DELAY_MS: u64 = 500; - #[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(collection_data_id_hash, transaction_version))] #[diesel(table_name = collection_datas)] @@ -89,6 +86,8 @@ impl CollectionData { txn_timestamp: chrono::NaiveDateTime, table_handle_to_owner: &TableHandleToOwner, conn: &mut PgPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, ) -> anyhow::Result> { let table_item_data = table_item.data.as_ref().unwrap(); @@ -107,7 +106,14 @@ impl CollectionData { .map(|table_metadata| table_metadata.get_owner_address()); let mut creator_address = match maybe_creator_address { Some(ca) => ca, - None => match Self::get_collection_creator(conn, &table_handle).await { + None => match Self::get_collection_creator( + conn, + &table_handle, + query_retries, + query_retry_delay_ms, + ) + .await + { Ok(creator) => creator, Err(_) => { tracing::error!( @@ -169,15 +175,19 @@ impl CollectionData { pub async fn get_collection_creator( conn: &mut PgPoolConnection<'_>, table_handle: &str, + query_retries: u32, + query_retry_delay_ms: u64, ) -> anyhow::Result { - let mut retried = 0; - while retried < QUERY_RETRIES { - retried += 1; + let mut tried = 0; + while tried < query_retries { + tried += 1; match CurrentCollectionDataQuery::get_by_table_handle(conn, table_handle).await { Ok(current_collection_data) => return Ok(current_collection_data.creator_address), Err(_) => { - tokio::time::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS)) - .await; + if tried < query_retries { + tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms)) + .await; + } }, } } diff --git a/rust/processor/src/models/token_models/tokens.rs b/rust/processor/src/models/token_models/tokens.rs index 38728b2ef..2f244e9b9 100644 --- a/rust/processor/src/models/token_models/tokens.rs +++ b/rust/processor/src/models/token_models/tokens.rs @@ -73,6 +73,8 @@ impl Token { transaction: &Transaction, table_handle_to_owner: &TableHandleToOwner, conn: &mut PgPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, ) -> ( Vec, Vec, @@ -156,6 +158,8 @@ impl Token { txn_timestamp, table_handle_to_owner, conn, + query_retries, + query_retry_delay_ms, ) .await .unwrap(), diff --git a/rust/processor/src/models/token_v2_models/v2_collections.rs b/rust/processor/src/models/token_v2_models/v2_collections.rs index 601547fdf..f40f4551f 100644 --- a/rust/processor/src/models/token_v2_models/v2_collections.rs +++ b/rust/processor/src/models/token_v2_models/v2_collections.rs @@ -11,7 +11,7 @@ use crate::{ default_models::move_resources::MoveResource, object_models::v2_object_utils::ObjectAggregatedDataMapping, token_models::{ - collection_datas::{CollectionData, QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, + collection_datas::CollectionData, token_utils::{CollectionDataIdType, TokenWriteSet}, tokens::TableHandleToOwner, }, @@ -201,6 +201,8 @@ impl CollectionV2 { txn_timestamp: chrono::NaiveDateTime, table_handle_to_owner: &TableHandleToOwner, conn: &mut PgPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, ) -> anyhow::Result> { let table_item_data = table_item.data.as_ref().unwrap(); @@ -220,16 +222,27 @@ impl CollectionV2 { let mut creator_address = match maybe_creator_address { Some(ca) => ca, None => { - match Self::get_collection_creator_for_v1(conn, &table_handle) - .await - .context(format!( - "Failed to get collection creator for table handle {}, txn version {}", - table_handle, txn_version - )) { + match Self::get_collection_creator_for_v1( + conn, + &table_handle, + query_retries, + query_retry_delay_ms, + ) + .await + .context(format!( + "Failed to get collection creator for table handle {}, txn version {}", + table_handle, txn_version + )) { Ok(ca) => ca, Err(_) => { // Try our best by getting from the older collection data - match CollectionData::get_collection_creator(conn, &table_handle).await + match CollectionData::get_collection_creator( + conn, + &table_handle, + query_retries, + query_retry_delay_ms, + ) + .await { Ok(creator) => creator, Err(_) => { @@ -298,15 +311,19 @@ impl CollectionV2 { async fn get_collection_creator_for_v1( conn: &mut PgPoolConnection<'_>, table_handle: &str, + query_retries: u32, + query_retry_delay_ms: u64, ) -> anyhow::Result { - let mut retried = 0; - while retried < QUERY_RETRIES { - retried += 1; + let mut tried = 0; + while tried < query_retries { + tried += 1; match Self::get_by_table_handle(conn, table_handle).await { Ok(creator) => return Ok(creator), Err(_) => { - tokio::time::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS)) - .await; + if tried < query_retries { + tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms)) + .await; + } }, } } diff --git a/rust/processor/src/models/token_v2_models/v2_token_activities.rs b/rust/processor/src/models/token_v2_models/v2_token_activities.rs index b311bc0c2..a74674b6d 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_activities.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_activities.rs @@ -93,7 +93,7 @@ impl TokenActivityV2 { let fungible_asset = object_data.fungible_asset_store.as_ref().unwrap(); let token_data_id = fungible_asset.metadata.get_reference_address(); // Exit early if it's not a token - if !TokenDataV2::is_address_fungible_token( + if !TokenDataV2::is_address_token( conn, &token_data_id, object_metadatas, diff --git a/rust/processor/src/models/token_v2_models/v2_token_datas.rs b/rust/processor/src/models/token_v2_models/v2_token_datas.rs index b1c497ddc..10ed2a944 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_datas.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_datas.rs @@ -9,10 +9,7 @@ use super::v2_token_utils::{TokenStandard, TokenV2}; use crate::{ models::{ object_models::v2_object_utils::ObjectAggregatedDataMapping, - token_models::{ - collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, - token_utils::TokenWriteSet, - }, + token_models::token_utils::TokenWriteSet, }, schema::{current_token_datas_v2, token_datas_v2}, utils::{database::PgPoolConnection, util::standardize_address}, @@ -259,94 +256,45 @@ impl TokenDataV2 { /// A fungible asset can also be a token. We will make a best effort guess at whether this is a fungible token. /// 1. If metadata is present with a token object, then is a token /// 2. If metadata is not present, we will do a lookup in the db. - pub async fn is_address_fungible_token( + pub async fn is_address_token( conn: &mut PgPoolConnection<'_>, token_data_id: &str, object_aggregated_data_mapping: &ObjectAggregatedDataMapping, txn_version: i64, ) -> bool { - // 1. If metadata is present, the object is a token iff token struct is also present in the object if let Some(object_data) = object_aggregated_data_mapping.get(token_data_id) { - if object_data.fungible_asset_metadata.is_some() { - return object_data.token.is_some(); - } + return object_data.token.is_some(); } - // 2. If metadata is not present, we will do a lookup in the db. - match CurrentTokenDataV2::get_current_token_data_v2(conn, txn_version, token_data_id).await - { - Ok(token_data) => { - if let Some(is_fungible_v2) = token_data.is_fungible_v2 { - return is_fungible_v2; - } - // If is_fungible_v2 is null, that's likely because it's a v1 token, which are not fungible - false + match CurrentTokenDataV2Query::get_exists(conn, token_data_id).await { + Ok(is_token) => is_token, + Err(e) => { + // TODO: Standardize this error handling + panic!("Version: {}, error {:?}", txn_version, e) }, - Err(_) => { - tracing::error!( - transaction_version = txn_version, - lookup_key = token_data_id, - "Missing current_token_data_v2 for token_data_id: {}. You probably should backfill db.", - token_data_id, - ); - // Default - false - }, - } - } -} - -impl CurrentTokenDataV2 { - pub async fn get_current_token_data_v2( - conn: &mut PgPoolConnection<'_>, - txn_version: i64, - token_data_id: &str, - ) -> anyhow::Result { - let mut retries = 0; - while retries < QUERY_RETRIES { - retries += 1; - match CurrentTokenDataV2Query::get_by_token_data_id(conn, token_data_id).await { - Ok(res) => { - return Ok(CurrentTokenDataV2 { - token_data_id: res.token_data_id, - collection_id: res.collection_id, - token_name: res.token_name, - maximum: res.maximum, - supply: res.supply, - largest_property_version_v1: res.largest_property_version_v1, - token_uri: res.token_uri, - token_properties: res.token_properties, - description: res.description, - token_standard: res.token_standard, - is_fungible_v2: res.is_fungible_v2, - last_transaction_version: res.last_transaction_version, - last_transaction_timestamp: res.last_transaction_timestamp, - decimals: res.decimals, - }); - }, - Err(_) => { - tracing::error!( - transaction_version = txn_version, - lookup_key = token_data_id, - "Missing current_token_data_v2 for token_data_id: {}. You probably should backfill db.", - token_data_id, - ); - tokio::time::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS)) - .await; - }, - } } - Err(anyhow::anyhow!("Failed to get token data")) } } impl CurrentTokenDataV2Query { - pub async fn get_by_token_data_id( + /// TODO: change this to diesel exists. Also this only checks once so may miss some data if coming from another thread + pub async fn get_exists( conn: &mut PgPoolConnection<'_>, token_data_id: &str, - ) -> diesel::QueryResult { - current_token_datas_v2::table + ) -> anyhow::Result { + match current_token_datas_v2::table .filter(current_token_datas_v2::token_data_id.eq(token_data_id)) .first::(conn) .await + .optional() + { + Ok(result) => { + if result.is_some() { + Ok(true) + } else { + Ok(false) + } + }, + Err(e) => anyhow::bail!("Error checking if token_data_id exists: {:?}", e), + } } } diff --git a/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs b/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs index e7059ac0c..f0a3c926d 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs @@ -14,11 +14,7 @@ use crate::{ default_models::move_resources::MoveResource, fungible_asset_models::v2_fungible_asset_utils::V2FungibleAssetResource, object_models::v2_object_utils::{ObjectAggregatedDataMapping, ObjectWithMetadata}, - token_models::{ - collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, - token_utils::TokenWriteSet, - tokens::TableHandleToOwner, - }, + token_models::{token_utils::TokenWriteSet, tokens::TableHandleToOwner}, token_v2_models::v2_token_utils::DEFAULT_OWNER_ADDRESS, }, schema::{current_token_ownerships_v2, token_ownerships_v2}, @@ -297,6 +293,8 @@ impl TokenOwnershipV2 { prior_nft_ownership: &AHashMap, tokens_burned: &TokenV2Burned, conn: &mut PgPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, ) -> anyhow::Result> { let token_address = standardize_address(&write_resource.address.to_string()); if let Some(burn_event) = tokens_burned.get(&token_address) { @@ -312,6 +310,8 @@ impl TokenOwnershipV2 { match CurrentTokenOwnershipV2Query::get_latest_owned_nft_by_token_data_id( conn, &token_address, + query_retries, + query_retry_delay_ms, ) .await { @@ -400,7 +400,7 @@ impl TokenOwnershipV2 { let object_core = &object_data.object.object_core; let token_data_id = inner.metadata.get_reference_address(); // Exit early if it's not a token - if !TokenDataV2::is_address_fungible_token( + if !TokenDataV2::is_address_token( conn, &token_data_id, object_metadatas, @@ -616,10 +616,12 @@ impl CurrentTokenOwnershipV2Query { pub async fn get_latest_owned_nft_by_token_data_id( conn: &mut PgPoolConnection<'_>, token_data_id: &str, + query_retries: u32, + query_retry_delay_ms: u64, ) -> anyhow::Result { - let mut retried = 0; - while retried < QUERY_RETRIES { - retried += 1; + let mut tried = 0; + while tried < query_retries { + tried += 1; match Self::get_latest_owned_nft_by_token_data_id_impl(conn, token_data_id).await { Ok(inner) => { return Ok(NFTOwnershipV2 { @@ -629,8 +631,10 @@ impl CurrentTokenOwnershipV2Query { }); }, Err(_) => { - tokio::time::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS)) - .await; + if tried < query_retries { + tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms)) + .await; + } }, } } diff --git a/rust/processor/src/processors/ans_processor.rs b/rust/processor/src/processors/ans_processor.rs index a448ac3c6..54fb87ea1 100644 --- a/rust/processor/src/processors/ans_processor.rs +++ b/rust/processor/src/processors/ans_processor.rs @@ -49,8 +49,8 @@ pub struct AnsProcessor { impl AnsProcessor { pub fn new( connection_pool: PgDbPool, - per_table_chunk_sizes: AHashMap, config: AnsProcessorConfig, + per_table_chunk_sizes: AHashMap, ) -> Self { tracing::info!( ans_v1_primary_names_table_handle = config.ans_v1_primary_names_table_handle, @@ -60,8 +60,8 @@ impl AnsProcessor { ); Self { connection_pool, - per_table_chunk_sizes, config, + per_table_chunk_sizes, } } } diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index cb2349eb2..f85d9de7f 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -31,10 +31,10 @@ use self::{ fungible_asset_processor::FungibleAssetProcessor, monitoring_processor::MonitoringProcessor, nft_metadata_processor::{NftMetadataProcessor, NftMetadataProcessorConfig}, - objects_processor::ObjectsProcessor, - stake_processor::StakeProcessor, + objects_processor::{ObjectsProcessor, ObjectsProcessorConfig}, + stake_processor::{StakeProcessor, StakeProcessorConfig}, token_processor::{TokenProcessor, TokenProcessorConfig}, - token_v2_processor::TokenV2Processor, + token_v2_processor::{TokenV2Processor, TokenV2ProcessorConfig}, transaction_metadata_processor::TransactionMetadataProcessor, user_transaction_processor::UserTransactionProcessor, }; @@ -187,10 +187,10 @@ pub enum ProcessorConfig { FungibleAssetProcessor, MonitoringProcessor, NftMetadataProcessor(NftMetadataProcessorConfig), - ObjectsProcessor, - StakeProcessor, + ObjectsProcessor(ObjectsProcessorConfig), + StakeProcessor(StakeProcessorConfig), TokenProcessor(TokenProcessorConfig), - TokenV2Processor, + TokenV2Processor(TokenV2ProcessorConfig), TransactionMetadataProcessor, UserTransactionProcessor, } diff --git a/rust/processor/src/processors/nft_metadata_processor.rs b/rust/processor/src/processors/nft_metadata_processor.rs index a7b8cd328..792878d91 100644 --- a/rust/processor/src/processors/nft_metadata_processor.rs +++ b/rust/processor/src/processors/nft_metadata_processor.rs @@ -17,6 +17,7 @@ use crate::{ database::{PgDbPool, PgPoolConnection}, util::{parse_timestamp, remove_null_bytes, standardize_address}, }, + IndexerGrpcProcessorConfig, }; use ahash::AHashMap; use aptos_protos::transaction::v1::{write_set_change::Change, Transaction}; @@ -38,6 +39,10 @@ pub const CHUNK_SIZE: usize = 1000; pub struct NftMetadataProcessorConfig { pub pubsub_topic_name: String, pub google_application_credentials: Option, + #[serde(default = "IndexerGrpcProcessorConfig::default_query_retries")] + pub query_retries: u32, + #[serde(default = "IndexerGrpcProcessorConfig::default_query_retry_delay_ms")] + pub query_retry_delay_ms: u64, } pub struct NftMetadataProcessor { @@ -96,6 +101,9 @@ impl ProcessorTrait for NftMetadataProcessor { let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); let mut conn = self.get_conn().await; + let query_retries = self.config.query_retries; + let query_retry_delay_ms = self.config.query_retry_delay_ms; + let db_chain_id = db_chain_id.unwrap_or_else(|| { error!("[NFT Metadata Crawler] db_chain_id must not be null"); panic!(); @@ -114,8 +122,14 @@ impl ProcessorTrait for NftMetadataProcessor { let ordering_key = get_current_timestamp(); // Publish CurrentTokenDataV2 and CurrentCollectionV2 from transactions - let (token_datas, collections) = - parse_v2_token(&transactions, &table_handle_to_owner, &mut conn).await; + let (token_datas, collections) = parse_v2_token( + &transactions, + &table_handle_to_owner, + &mut conn, + query_retries, + query_retry_delay_ms, + ) + .await; let mut pubsub_messages: Vec = Vec::with_capacity(token_datas.len() + collections.len()); @@ -204,6 +218,8 @@ async fn parse_v2_token( transactions: &[Transaction], table_handle_to_owner: &TableHandleToOwner, conn: &mut PgPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, ) -> (Vec, Vec) { let mut current_token_datas_v2: AHashMap = AHashMap::new(); @@ -266,6 +282,8 @@ async fn parse_v2_token( txn_timestamp, table_handle_to_owner, conn, + query_retries, + query_retry_delay_ms, ) .await .unwrap() diff --git a/rust/processor/src/processors/objects_processor.rs b/rust/processor/src/processors/objects_processor.rs index 269fd9bdd..13736299f 100644 --- a/rust/processor/src/processors/objects_processor.rs +++ b/rust/processor/src/processors/objects_processor.rs @@ -12,6 +12,7 @@ use crate::{ database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, util::standardize_address, }, + IndexerGrpcProcessorConfig, }; use ahash::AHashMap; use anyhow::bail; @@ -22,18 +23,33 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; +use serde::{Deserialize, Serialize}; use std::fmt::Debug; use tracing::error; +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct ObjectsProcessorConfig { + #[serde(default = "IndexerGrpcProcessorConfig::default_query_retries")] + pub query_retries: u32, + #[serde(default = "IndexerGrpcProcessorConfig::default_query_retry_delay_ms")] + pub query_retry_delay_ms: u64, +} pub struct ObjectsProcessor { connection_pool: PgDbPool, + config: ObjectsProcessorConfig, per_table_chunk_sizes: AHashMap, } impl ObjectsProcessor { - pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + pub fn new( + connection_pool: PgDbPool, + config: ObjectsProcessorConfig, + per_table_chunk_sizes: AHashMap, + ) -> Self { Self { connection_pool, + config, per_table_chunk_sizes, } } @@ -146,6 +162,8 @@ impl ProcessorTrait for ObjectsProcessor { let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); let mut conn = self.get_conn().await; + let query_retries = self.config.query_retries; + let query_retry_delay_ms = self.config.query_retry_delay_ms; // Moving object handling here because we need a single object // map through transactions for lookups @@ -220,6 +238,8 @@ impl ProcessorTrait for ObjectsProcessor { index, &all_current_objects, &mut conn, + query_retries, + query_retry_delay_ms, ) .await .unwrap() diff --git a/rust/processor/src/processors/stake_processor.rs b/rust/processor/src/processors/stake_processor.rs index 774d3b05c..07d9d185d 100644 --- a/rust/processor/src/processors/stake_processor.rs +++ b/rust/processor/src/processors/stake_processor.rs @@ -21,6 +21,7 @@ use crate::{ database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, util::{parse_timestamp, standardize_address}, }, + IndexerGrpcProcessorConfig, }; use ahash::AHashMap; use anyhow::bail; @@ -31,18 +32,34 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; +use serde::{Deserialize, Serialize}; use std::fmt::Debug; use tracing::error; +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct StakeProcessorConfig { + #[serde(default = "IndexerGrpcProcessorConfig::default_query_retries")] + pub query_retries: u32, + #[serde(default = "IndexerGrpcProcessorConfig::default_query_retry_delay_ms")] + pub query_retry_delay_ms: u64, +} + pub struct StakeProcessor { connection_pool: PgDbPool, + config: StakeProcessorConfig, per_table_chunk_sizes: AHashMap, } impl StakeProcessor { - pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + pub fn new( + connection_pool: PgDbPool, + config: StakeProcessorConfig, + per_table_chunk_sizes: AHashMap, + ) -> Self { Self { connection_pool, + config, per_table_chunk_sizes, } } @@ -381,6 +398,8 @@ impl ProcessorTrait for StakeProcessor { let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); let mut conn = self.get_conn().await; + let query_retries = self.config.query_retries; + let query_retry_delay_ms = self.config.query_retry_delay_ms; let mut all_current_stake_pool_voters: StakingPoolVoterMap = AHashMap::new(); let mut all_proposal_votes = vec![]; @@ -455,6 +474,8 @@ impl ProcessorTrait for StakeProcessor { txn, &active_pool_to_staking_pool, &mut conn, + query_retries, + query_retry_delay_ms, ) .await .unwrap(); @@ -470,6 +491,8 @@ impl ProcessorTrait for StakeProcessor { txn_timestamp, &all_vote_delegation_handle_to_pool_address, &mut conn, + query_retries, + query_retry_delay_ms, ) .await .unwrap(); @@ -489,6 +512,8 @@ impl ProcessorTrait for StakeProcessor { &active_pool_to_staking_pool, &all_current_delegated_voter, &mut conn, + query_retries, + query_retry_delay_ms, ) .await .unwrap() diff --git a/rust/processor/src/processors/token_processor.rs b/rust/processor/src/processors/token_processor.rs index c383d5ff5..27f08e7b2 100644 --- a/rust/processor/src/processors/token_processor.rs +++ b/rust/processor/src/processors/token_processor.rs @@ -17,6 +17,7 @@ use crate::{ }, schema, utils::database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, + IndexerGrpcProcessorConfig, }; use ahash::AHashMap; use anyhow::bail; @@ -35,6 +36,10 @@ use tracing::error; #[serde(deny_unknown_fields)] pub struct TokenProcessorConfig { pub nft_points_contract: Option, + #[serde(default = "IndexerGrpcProcessorConfig::default_query_retries")] + pub query_retries: u32, + #[serde(default = "IndexerGrpcProcessorConfig::default_query_retry_delay_ms")] + pub query_retry_delay_ms: u64, } pub struct TokenProcessor { @@ -46,8 +51,8 @@ pub struct TokenProcessor { impl TokenProcessor { pub fn new( connection_pool: PgDbPool, - per_table_chunk_sizes: AHashMap, config: TokenProcessorConfig, + per_table_chunk_sizes: AHashMap, ) -> Self { Self { connection_pool, @@ -434,6 +439,8 @@ impl ProcessorTrait for TokenProcessor { let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); let mut conn = self.get_conn().await; + let query_retries = self.config.query_retries; + let query_retry_delay_ms = self.config.query_retry_delay_ms; // First get all token related table metadata from the batch of transactions. This is in case // an earlier transaction has metadata (in resources) that's missing from a later transaction. @@ -474,7 +481,14 @@ impl ProcessorTrait for TokenProcessor { current_token_datas, current_collection_datas, current_token_claims, - ) = Token::from_transaction(txn, &table_handle_to_owner, &mut conn).await; + ) = Token::from_transaction( + txn, + &table_handle_to_owner, + &mut conn, + query_retries, + query_retry_delay_ms, + ) + .await; all_tokens.append(&mut tokens); all_token_ownerships.append(&mut token_ownerships); all_token_datas.append(&mut token_datas); diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index 34931bdb7..7975f24a9 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -33,6 +33,7 @@ use crate::{ database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool, PgPoolConnection}, util::{get_entry_function_from_user_request, parse_timestamp, standardize_address}, }, + IndexerGrpcProcessorConfig, }; use ahash::{AHashMap, AHashSet}; use anyhow::bail; @@ -43,18 +44,34 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; +use serde::{Deserialize, Serialize}; use std::fmt::Debug; use tracing::error; +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct TokenV2ProcessorConfig { + #[serde(default = "IndexerGrpcProcessorConfig::default_query_retries")] + pub query_retries: u32, + #[serde(default = "IndexerGrpcProcessorConfig::default_query_retry_delay_ms")] + pub query_retry_delay_ms: u64, +} + pub struct TokenV2Processor { connection_pool: PgDbPool, + config: TokenV2ProcessorConfig, per_table_chunk_sizes: AHashMap, } impl TokenV2Processor { - pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + pub fn new( + connection_pool: PgDbPool, + config: TokenV2ProcessorConfig, + per_table_chunk_sizes: AHashMap, + ) -> Self { Self { connection_pool, + config, per_table_chunk_sizes, } } @@ -434,6 +451,8 @@ impl ProcessorTrait for TokenV2Processor { let table_handle_to_owner = TableMetadataForToken::get_table_handle_to_owner_from_transactions(&transactions); + let query_retries = self.config.query_retries; + let query_retry_delay_ms = self.config.query_retry_delay_ms; // Token V2 processing which includes token v1 let ( collections_v2, @@ -445,7 +464,14 @@ impl ProcessorTrait for TokenV2Processor { current_deleted_token_ownerships_v2, token_activities_v2, current_token_v2_metadata, - ) = parse_v2_token(&transactions, &table_handle_to_owner, &mut conn).await; + ) = parse_v2_token( + &transactions, + &table_handle_to_owner, + &mut conn, + query_retries, + query_retry_delay_ms, + ) + .await; let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -499,6 +525,8 @@ async fn parse_v2_token( transactions: &[Transaction], table_handle_to_owner: &TableHandleToOwner, conn: &mut PgPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, ) -> ( Vec, Vec, @@ -738,6 +766,8 @@ async fn parse_v2_token( txn_timestamp, table_handle_to_owner, conn, + query_retries, + query_retry_delay_ms, ) .await .unwrap() @@ -972,6 +1002,8 @@ async fn parse_v2_token( &prior_nft_ownership, &tokens_burned, conn, + query_retries, + query_retry_delay_ms, ) .await .unwrap() diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index b3305221b..e54cd0df5 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -670,8 +670,8 @@ pub fn build_processor( ), ProcessorConfig::AnsProcessor(config) => Processor::from(AnsProcessor::new( db_pool, - per_table_chunk_sizes, config.clone(), + per_table_chunk_sizes, )), ProcessorConfig::CoinProcessor => { Processor::from(CoinProcessor::new(db_pool, per_table_chunk_sizes)) @@ -689,20 +689,26 @@ pub fn build_processor( ProcessorConfig::NftMetadataProcessor(config) => { Processor::from(NftMetadataProcessor::new(db_pool, config.clone())) }, - ProcessorConfig::ObjectsProcessor => { - Processor::from(ObjectsProcessor::new(db_pool, per_table_chunk_sizes)) - }, - ProcessorConfig::StakeProcessor => { - Processor::from(StakeProcessor::new(db_pool, per_table_chunk_sizes)) - }, + ProcessorConfig::ObjectsProcessor(config) => Processor::from(ObjectsProcessor::new( + db_pool, + config.clone(), + per_table_chunk_sizes, + )), + ProcessorConfig::StakeProcessor(config) => Processor::from(StakeProcessor::new( + db_pool, + config.clone(), + per_table_chunk_sizes, + )), ProcessorConfig::TokenProcessor(config) => Processor::from(TokenProcessor::new( db_pool, + config.clone(), per_table_chunk_sizes, + )), + ProcessorConfig::TokenV2Processor(config) => Processor::from(TokenV2Processor::new( + db_pool, config.clone(), + per_table_chunk_sizes, )), - ProcessorConfig::TokenV2Processor => { - Processor::from(TokenV2Processor::new(db_pool, per_table_chunk_sizes)) - }, ProcessorConfig::TransactionMetadataProcessor => Processor::from( TransactionMetadataProcessor::new(db_pool, per_table_chunk_sizes), ),