Skip to content

Commit

Permalink
[indexer] Make query retries and delay into parameters (#321)
Browse files Browse the repository at this point in the history
* optimize checking if fungible token logic

* move query retries and query cooldown between retries to parameters

* do not wait if we're done retrying

* lint
  • Loading branch information
bowenyang007 authored Mar 10, 2024
1 parent ff17f4a commit 6dd8014
Show file tree
Hide file tree
Showing 15 changed files with 291 additions and 102 deletions.
11 changes: 11 additions & 0 deletions rust/processor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ use server_framework::RunnableConfig;
use std::time::Duration;
use url::Url;

pub const QUERY_DEFAULT_RETRIES: u32 = 5;
pub const QUERY_DEFAULT_RETRY_DELAY_MS: u64 = 500;

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerGrpcProcessorConfig {
Expand All @@ -36,6 +39,14 @@ impl IndexerGrpcProcessorConfig {
pub const fn default_gap_detection_batch_size() -> u64 {
DEFAULT_GAP_DETECTION_BATCH_SIZE
}

pub const fn default_query_retries() -> u32 {
QUERY_DEFAULT_RETRIES
}

pub const fn default_query_retry_delay_ms() -> u64 {
QUERY_DEFAULT_RETRY_DELAY_MS
}
}

#[async_trait::async_trait]
Expand Down
43 changes: 21 additions & 22 deletions rust/processor/src/models/object_models/v2_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@

use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping};
use crate::{
models::{
default_models::move_resources::MoveResource,
token_models::collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS},
},
models::default_models::move_resources::MoveResource,
schema::{current_objects, objects},
utils::{database::PgPoolConnection, util::standardize_address},
};
Expand All @@ -21,7 +18,6 @@ use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};
use tracing::warn;

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(transaction_version, write_set_change_index))]
Expand Down Expand Up @@ -111,6 +107,8 @@ impl Object {
write_set_change_index: i64,
object_mapping: &AHashMap<CurrentObjectPK, CurrentObject>,
conn: &mut PgPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<Option<(Self, CurrentObject)>> {
if delete_resource.type_str == "0x1::object::ObjectGroup" {
let resource = MoveResource::from_delete_resource(
Expand All @@ -122,7 +120,14 @@ impl Object {
let previous_object = if let Some(object) = object_mapping.get(&resource.address) {
object.clone()
} else {
match Self::get_current_object(conn, &resource.address, txn_version).await {
match Self::get_current_object(
conn,
&resource.address,
query_retries,
query_retry_delay_ms,
)
.await
{
Ok(object) => object,
Err(_) => {
tracing::error!(
Expand Down Expand Up @@ -166,11 +171,12 @@ impl Object {
pub async fn get_current_object(
conn: &mut PgPoolConnection<'_>,
object_address: &str,
transaction_version: i64,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<CurrentObject> {
let mut retries = 0;
while retries < QUERY_RETRIES {
retries += 1;
let mut tried = 0;
while tried < query_retries {
tried += 1;
match CurrentObjectQuery::get_by_address(object_address, conn).await {
Ok(res) => {
return Ok(CurrentObject {
Expand All @@ -183,18 +189,11 @@ impl Object {
is_deleted: res.is_deleted,
});
},
Err(e) => {
warn!(
transaction_version,
error = ?e,
object_address,
retry_ms = QUERY_RETRY_DELAY_MS,
"Failed to get object from current_objects table for object_address: {}, retrying in {} ms. ",
object_address,
QUERY_RETRY_DELAY_MS,
);
tokio::time::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS))
.await;
Err(_) => {
if tried < query_retries {
tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms))
.await;
}
},
}
}
Expand Down
44 changes: 31 additions & 13 deletions rust/processor/src/models/stake_models/current_delegated_voter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use super::{
stake_utils::VoteDelegationTableItem,
};
use crate::{
models::token_models::collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS},
schema::current_delegated_voter,
utils::{database::PgPoolConnection, util::standardize_address},
};
Expand Down Expand Up @@ -77,6 +76,8 @@ impl CurrentDelegatedVoter {
txn_timestamp: chrono::NaiveDateTime,
vote_delegation_handle_to_pool_address: &VoteDelegationTableHandleToPool,
conn: &mut PgPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<CurrentDelegatedVoterMap> {
let mut delegated_voter_map: CurrentDelegatedVoterMap = AHashMap::new();

Expand All @@ -93,7 +94,7 @@ impl CurrentDelegatedVoter {
Some(pool_address) => pool_address.clone(),
None => {
// look up from db
Self::get_delegation_pool_address_by_table_handle(conn, &table_handle).await
Self::get_delegation_pool_address_by_table_handle(conn, &table_handle, query_retries, query_retry_delay_ms).await
.unwrap_or_else(|_| {
tracing::error!(
transaction_version = txn_version,
Expand Down Expand Up @@ -137,6 +138,8 @@ impl CurrentDelegatedVoter {
active_pool_to_staking_pool: &ShareToStakingPoolMapping,
previous_delegated_voters: &CurrentDelegatedVoterMap,
conn: &mut PgPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<Option<Self>> {
if let Some((_, active_balance)) =
CurrentDelegatorBalance::get_active_share_from_write_table_item(
Expand All @@ -156,7 +159,14 @@ impl CurrentDelegatedVoter {
Some(_) => true,
None => {
// look up from db
Self::get_existence_by_pk(conn, &delegator_address, &pool_address).await
Self::get_existence_by_pk(
conn,
&delegator_address,
&pool_address,
query_retries,
query_retry_delay_ms,
)
.await
},
};
if !already_exists {
Expand All @@ -177,17 +187,21 @@ impl CurrentDelegatedVoter {
pub async fn get_delegation_pool_address_by_table_handle(
conn: &mut PgPoolConnection<'_>,
table_handle: &str,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<String> {
let mut retried = 0;
while retried < QUERY_RETRIES {
retried += 1;
let mut tried = 0;
while tried < query_retries {
tried += 1;
match CurrentDelegatedVoterQuery::get_by_table_handle(conn, table_handle).await {
Ok(current_delegated_voter_query_result) => {
return Ok(current_delegated_voter_query_result.delegation_pool_address);
},
Err(_) => {
tokio::time::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS))
.await;
if tried < query_retries {
tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms))
.await;
}
},
}
}
Expand All @@ -200,10 +214,12 @@ impl CurrentDelegatedVoter {
conn: &mut PgPoolConnection<'_>,
delegator_address: &str,
delegation_pool_address: &str,
query_retries: u32,
query_retry_delay_ms: u64,
) -> bool {
let mut retried = 0;
while retried < QUERY_RETRIES {
retried += 1;
let mut tried = 0;
while tried < query_retries {
tried += 1;
match CurrentDelegatedVoterQuery::get_by_pk(
conn,
delegator_address,
Expand All @@ -213,8 +229,10 @@ impl CurrentDelegatedVoter {
{
Ok(_) => return true,
Err(_) => {
tokio::time::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS))
.await;
if tried < query_retries {
tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms))
.await;
}
},
}
}
Expand Down
49 changes: 33 additions & 16 deletions rust/processor/src/models/stake_models/delegator_balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@

use super::delegator_pools::{DelegatorPool, DelegatorPoolBalanceMetadata, PoolBalanceMetadata};
use crate::{
models::{
default_models::move_tables::TableItem,
token_models::collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS},
},
models::default_models::move_tables::TableItem,
schema::{current_delegator_balances, delegator_balances},
utils::{database::PgPoolConnection, util::standardize_address},
};
Expand Down Expand Up @@ -137,6 +134,8 @@ impl CurrentDelegatorBalance {
inactive_pool_to_staking_pool: &ShareToStakingPoolMapping,
inactive_share_to_pool: &ShareToPoolMapping,
conn: &mut PgPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<Option<(DelegatorBalance, Self)>> {
let table_handle = standardize_address(&write_table_item.handle.to_string());
// The mapping will tell us if the table item belongs to an inactive pool
Expand All @@ -153,6 +152,8 @@ impl CurrentDelegatorBalance {
match Self::get_staking_pool_from_inactive_share_handle(
conn,
&inactive_pool_handle,
query_retries,
query_retry_delay_ms,
)
.await
{
Expand Down Expand Up @@ -257,6 +258,8 @@ impl CurrentDelegatorBalance {
inactive_pool_to_staking_pool: &ShareToStakingPoolMapping,
inactive_share_to_pool: &ShareToPoolMapping,
conn: &mut PgPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<Option<(DelegatorBalance, Self)>> {
let table_handle = standardize_address(&delete_table_item.handle.to_string());
// The mapping will tell us if the table item belongs to an inactive pool
Expand All @@ -269,13 +272,17 @@ impl CurrentDelegatorBalance {
.map(|metadata| metadata.staking_pool_address.clone())
{
Some(pool_address) => pool_address,
None => {
Self::get_staking_pool_from_inactive_share_handle(conn, &inactive_pool_handle)
.await
.context(format!("Failed to get staking pool address from inactive share handle {}, txn version {}",
inactive_pool_handle, txn_version
))?
}
None => Self::get_staking_pool_from_inactive_share_handle(
conn,
&inactive_pool_handle,
query_retries,
query_retry_delay_ms,
)
.await
.context(format!(
"Failed to get staking pool from inactive share handle {}, txn version {}",
inactive_pool_handle, txn_version
))?,
};
let delegator_address = standardize_address(&delete_table_item.key.to_string());

Expand Down Expand Up @@ -364,17 +371,21 @@ impl CurrentDelegatorBalance {
pub async fn get_staking_pool_from_inactive_share_handle(
conn: &mut PgPoolConnection<'_>,
table_handle: &str,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<String> {
let mut retried = 0;
while retried < QUERY_RETRIES {
retried += 1;
let mut tried = 0;
while tried < query_retries {
tried += 1;
match CurrentDelegatorBalanceQuery::get_by_inactive_share_handle(conn, table_handle)
.await
{
Ok(current_delegator_balance) => return Ok(current_delegator_balance.pool_address),
Err(_) => {
tokio::time::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS))
.await;
if tried < query_retries {
tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms))
.await;
}
},
}
}
Expand All @@ -387,6 +398,8 @@ impl CurrentDelegatorBalance {
transaction: &Transaction,
active_pool_to_staking_pool: &ShareToStakingPoolMapping,
conn: &mut PgPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<(Vec<DelegatorBalance>, CurrentDelegatorBalanceMap)> {
let mut inactive_pool_to_staking_pool: ShareToStakingPoolMapping = AHashMap::new();
let mut inactive_share_to_pool: ShareToPoolMapping = AHashMap::new();
Expand Down Expand Up @@ -436,6 +449,8 @@ impl CurrentDelegatorBalance {
&inactive_pool_to_staking_pool,
&inactive_share_to_pool,
conn,
query_retries,
query_retry_delay_ms,
)
.await
.unwrap()
Expand All @@ -461,6 +476,8 @@ impl CurrentDelegatorBalance {
&inactive_pool_to_staking_pool,
&inactive_share_to_pool,
conn,
query_retries,
query_retry_delay_ms,
)
.await
.unwrap()
Expand Down
28 changes: 19 additions & 9 deletions rust/processor/src/models/token_models/collection_datas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ use diesel_async::RunQueryDsl;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

pub const QUERY_RETRIES: u32 = 5;
pub const QUERY_RETRY_DELAY_MS: u64 = 500;

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(collection_data_id_hash, transaction_version))]
#[diesel(table_name = collection_datas)]
Expand Down Expand Up @@ -89,6 +86,8 @@ impl CollectionData {
txn_timestamp: chrono::NaiveDateTime,
table_handle_to_owner: &TableHandleToOwner,
conn: &mut PgPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<Option<(Self, CurrentCollectionData)>> {
let table_item_data = table_item.data.as_ref().unwrap();

Expand All @@ -107,7 +106,14 @@ impl CollectionData {
.map(|table_metadata| table_metadata.get_owner_address());
let mut creator_address = match maybe_creator_address {
Some(ca) => ca,
None => match Self::get_collection_creator(conn, &table_handle).await {
None => match Self::get_collection_creator(
conn,
&table_handle,
query_retries,
query_retry_delay_ms,
)
.await
{
Ok(creator) => creator,
Err(_) => {
tracing::error!(
Expand Down Expand Up @@ -169,15 +175,19 @@ impl CollectionData {
pub async fn get_collection_creator(
conn: &mut PgPoolConnection<'_>,
table_handle: &str,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<String> {
let mut retried = 0;
while retried < QUERY_RETRIES {
retried += 1;
let mut tried = 0;
while tried < query_retries {
tried += 1;
match CurrentCollectionDataQuery::get_by_table_handle(conn, table_handle).await {
Ok(current_collection_data) => return Ok(current_collection_data.creator_address),
Err(_) => {
tokio::time::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS))
.await;
if tried < query_retries {
tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms))
.await;
}
},
}
}
Expand Down
Loading

0 comments on commit 6dd8014

Please sign in to comment.