Skip to content

Commit

Permalink
Refactor the code to make it easier to support multiple db backends i…
Browse files Browse the repository at this point in the history
…n the future.
  • Loading branch information
grao1991 committed Jun 11, 2024
1 parent 6255230 commit 3b02d57
Show file tree
Hide file tree
Showing 192 changed files with 194 additions and 249 deletions.
3 changes: 3 additions & 0 deletions rust/processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,6 @@ Indexer GRPC parser is to indexer data processor that leverages the indexer grpc
### Use a custom parser

- Check our [indexer processors](https://github.com/aptos-labs/aptos-indexer-processors)!

### Manually running diesel-cli
- `cd` into the database folder under `src/db/`, then run it.
1 change: 1 addition & 0 deletions rust/processor/src/db/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod models;
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#![allow(clippy::unused_unit)]

use crate::{
models::{
db::common::models::{
object_models::v2_object_utils::ObjectWithMetadata,
user_transactions_models::user_transactions::UserTransaction,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::{
ans_utils::{get_token_name, NameRecordV2, SetReverseLookupEvent, SubdomainExtV2},
};
use crate::{
models::token_v2_models::v2_token_utils::TokenStandard,
db::common::models::token_v2_models::v2_token_utils::TokenStandard,
schema::{
ans_lookup_v2, ans_primary_name_v2, current_ans_lookup_v2, current_ans_primary_name_v2,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#![allow(clippy::extra_unused_lifetimes)]

use crate::{
models::default_models::move_resources::MoveResource,
db::common::models::default_models::move_resources::MoveResource,
utils::util::{
bigdecimal_to_u64, deserialize_from_string, parse_timestamp_secs, standardize_address,
truncate_str,
Expand Down Expand Up @@ -38,12 +38,6 @@ pub struct OptionalBigDecimal {
vec: Vec<BigDecimalWrapper>,
}

impl OptionalBigDecimal {
fn get_big_decimal(&self) -> Option<BigDecimal> {
self.vec.first().map(|x| x.0.clone())
}
}

pub fn get_token_name(domain_name: &str, subdomain_name: &str) -> String {
let domain = truncate_str(domain_name, DOMAIN_LENGTH);
let subdomain = truncate_str(subdomain_name, DOMAIN_LENGTH);
Expand Down Expand Up @@ -99,10 +93,6 @@ impl NameRecordV1 {
parse_timestamp_secs(bigdecimal_to_u64(&self.expiration_time_sec), 0)
}

pub fn get_property_version(&self) -> u64 {
bigdecimal_to_u64(&self.property_version)
}

pub fn get_target_address(&self) -> Option<String> {
self.target_address
.get_string()
Expand Down Expand Up @@ -309,44 +299,6 @@ impl SetReverseLookupEvent {
get_token_name(&domain, &subdomain)
}

pub fn get_curr_expiration_time(&self) -> Option<chrono::NaiveDateTime> {
self.curr_expiration_time_secs
.get_big_decimal()
.map(|x| parse_timestamp_secs(bigdecimal_to_u64(&x), 0))
}

pub fn get_prev_domain_trunc(&self) -> String {
truncate_str(
self.prev_domain_name
.get_string()
.unwrap_or_default()
.as_str(),
DOMAIN_LENGTH,
)
}

pub fn get_prev_subdomain_trunc(&self) -> String {
truncate_str(
self.prev_subdomain_name
.get_string()
.unwrap_or_default()
.as_str(),
DOMAIN_LENGTH,
)
}

pub fn get_prev_token_name(&self) -> String {
let domain = self.get_prev_domain_trunc();
let subdomain = self.get_prev_subdomain_trunc();
get_token_name(&domain, &subdomain)
}

pub fn get_prev_expiration_time(&self) -> Option<chrono::NaiveDateTime> {
self.prev_expiration_time_secs
.get_big_decimal()
.map(|x| parse_timestamp_secs(bigdecimal_to_u64(&x), 0))
}

pub fn from_event(
event: &Event,
ans_v2_contract_address: &str,
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{
coin_utils::{CoinEvent, EventGuidResource},
};
use crate::{
models::{
db::common::models::{
fungible_asset_models::{
v2_fungible_asset_activities::{
CoinType, CurrentCoinBalancePK, EventToCoinType, BURN_GAS_EVENT_CREATION_NUM,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

use super::coin_utils::{CoinInfoType, CoinResource};
use crate::{
models::fungible_asset_models::v2_fungible_asset_activities::EventToCoinType,
db::common::models::fungible_asset_models::v2_fungible_asset_activities::EventToCoinType,
schema::{coin_balances, current_coin_balances},
utils::util::standardize_address,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#![allow(clippy::unused_unit)]

use crate::{
models::default_models::move_tables::TableItem,
db::common::models::default_models::move_tables::TableItem,
schema::coin_supply,
utils::util::{hash_str, APTOS_COIN_TYPE_STR},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#![allow(clippy::extra_unused_lifetimes)]

use crate::{
models::default_models::move_resources::MoveResource,
db::common::models::default_models::move_resources::MoveResource,
utils::util::{deserialize_from_string, hash_str, standardize_address, truncate_str},
};
use anyhow::{bail, Context, Result};
Expand Down Expand Up @@ -78,13 +78,6 @@ pub struct IntegerWrapperResource {
pub vec: Vec<IntegerResource>,
}

impl IntegerWrapperResource {
/// In case we do want to track supply
pub fn get_supply(&self) -> Option<BigDecimal> {
self.vec.first().map(|inner| inner.value.clone())
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct AggregatorResource {
pub handle: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

use super::v2_fungible_asset_utils::{FeeStatement, FungibleAssetEvent};
use crate::{
models::{
db::common::models::{
coin_models::{
coin_activities::CoinActivity,
coin_utils::{CoinEvent, CoinInfoType, EventGuidResource},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::{
v2_fungible_asset_activities::EventToCoinType, v2_fungible_asset_utils::FungibleAssetStore,
};
use crate::{
models::{
db::common::models::{
coin_models::coin_utils::{CoinInfoType, CoinResource},
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_v2_models::v2_token_utils::{TokenStandard, V2_STANDARD},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#![allow(clippy::extra_unused_lifetimes)]

use crate::{
models::{
db::common::models::{
coin_models::coin_utils::COIN_ADDR, default_models::move_resources::MoveResource,
token_models::token_utils::URI_LENGTH, token_v2_models::v2_token_utils::ResourceReference,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

use super::v2_fungible_asset_utils::FungibleAssetMetadata;
use crate::{
models::{
db::common::models::{
coin_models::coin_utils::{CoinInfoType, CoinResource},
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_v2_models::v2_token_utils::TokenStandard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#![allow(clippy::extra_unused_lifetimes)]

use crate::{schema::ledger_infos, utils::database::PgPoolConnection};
use crate::{schema::ledger_infos, utils::database::DbPoolConnection};
use diesel::{OptionalExtension, QueryDsl};
use diesel_async::RunQueryDsl;

Expand All @@ -15,7 +15,7 @@ pub struct LedgerInfo {
}

impl LedgerInfo {
pub async fn get(conn: &mut PgPoolConnection<'_>) -> diesel::QueryResult<Option<Self>> {
pub async fn get(conn: &mut DbPoolConnection<'_>) -> diesel::QueryResult<Option<Self>> {
ledger_infos::table
.select(ledger_infos::all_columns)
.first::<Self>(conn)
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#![allow(clippy::unused_unit)]

use crate::{
models::{
db::common::models::{
default_models::move_resources::MoveResource,
fungible_asset_models::v2_fungible_asset_utils::{
ConcurrentFungibleAssetBalance, ConcurrentFungibleAssetSupply, FungibleAssetMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping};
use crate::{
models::default_models::move_resources::MoveResource,
db::common::models::default_models::move_resources::MoveResource,
schema::{current_objects, objects},
utils::{database::PgPoolConnection, util::standardize_address},
utils::{database::DbPoolConnection, util::standardize_address},
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::{DeleteResource, WriteResource};
Expand Down Expand Up @@ -106,7 +106,7 @@ impl Object {
txn_version: i64,
write_set_change_index: i64,
object_mapping: &AHashMap<CurrentObjectPK, CurrentObject>,
conn: &mut PgPoolConnection<'_>,
conn: &mut DbPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<Option<(Self, CurrentObject)>> {
Expand Down Expand Up @@ -169,7 +169,7 @@ impl Object {
/// This is actually not great because object owner can change. The best we can do now though.
/// This will loop forever until we get the object from the db
pub async fn get_current_object(
conn: &mut PgPoolConnection<'_>,
conn: &mut DbPoolConnection<'_>,
object_address: &str,
query_retries: u32,
query_retry_delay_ms: u64,
Expand Down Expand Up @@ -205,7 +205,7 @@ impl CurrentObjectQuery {
/// TODO: Change this to a KV store
pub async fn get_by_address(
object_address: &str,
conn: &mut PgPoolConnection<'_>,
conn: &mut DbPoolConnection<'_>,
) -> diesel::QueryResult<Self> {
current_objects::table
.filter(current_objects::object_address.eq(object_address))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#![allow(clippy::extra_unused_lifetimes)]

use crate::{schema::processor_status, utils::database::PgPoolConnection};
use crate::{schema::processor_status, utils::database::DbPoolConnection};
use diesel::{ExpressionMethods, OptionalExtension, QueryDsl};
use diesel_async::RunQueryDsl;

Expand All @@ -29,7 +29,7 @@ pub struct ProcessorStatusQuery {
impl ProcessorStatusQuery {
pub async fn get_by_processor(
processor_name: &str,
conn: &mut PgPoolConnection<'_>,
conn: &mut DbPoolConnection<'_>,
) -> diesel::QueryResult<Option<Self>> {
processor_status::table
.filter(processor_status::processor.eq(processor_name))
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::{
};
use crate::{
schema::current_delegated_voter,
utils::{database::PgPoolConnection, util::standardize_address},
utils::{database::DbPoolConnection, util::standardize_address},
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::WriteTableItem;
Expand Down Expand Up @@ -75,7 +75,7 @@ impl CurrentDelegatedVoter {
txn_version: i64,
txn_timestamp: chrono::NaiveDateTime,
vote_delegation_handle_to_pool_address: &VoteDelegationTableHandleToPool,
conn: &mut PgPoolConnection<'_>,
conn: &mut DbPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<CurrentDelegatedVoterMap> {
Expand Down Expand Up @@ -137,7 +137,7 @@ impl CurrentDelegatedVoter {
txn_timestamp: chrono::NaiveDateTime,
active_pool_to_staking_pool: &ShareToStakingPoolMapping,
previous_delegated_voters: &CurrentDelegatedVoterMap,
conn: &mut PgPoolConnection<'_>,
conn: &mut DbPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<Option<Self>> {
Expand Down Expand Up @@ -185,7 +185,7 @@ impl CurrentDelegatedVoter {
}

pub async fn get_delegation_pool_address_by_table_handle(
conn: &mut PgPoolConnection<'_>,
conn: &mut DbPoolConnection<'_>,
table_handle: &str,
query_retries: u32,
query_retry_delay_ms: u64,
Expand All @@ -211,7 +211,7 @@ impl CurrentDelegatedVoter {
}

pub async fn get_existence_by_pk(
conn: &mut PgPoolConnection<'_>,
conn: &mut DbPoolConnection<'_>,
delegator_address: &str,
delegation_pool_address: &str,
query_retries: u32,
Expand Down Expand Up @@ -242,7 +242,7 @@ impl CurrentDelegatedVoter {

impl CurrentDelegatedVoterQuery {
pub async fn get_by_table_handle(
conn: &mut PgPoolConnection<'_>,
conn: &mut DbPoolConnection<'_>,
table_handle: &str,
) -> diesel::QueryResult<Self> {
current_delegated_voter::table
Expand All @@ -252,7 +252,7 @@ impl CurrentDelegatedVoterQuery {
}

pub async fn get_by_pk(
conn: &mut PgPoolConnection<'_>,
conn: &mut DbPoolConnection<'_>,
delegator_address: &str,
delegation_pool_address: &str,
) -> diesel::QueryResult<Self> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

use super::delegator_pools::{DelegatorPool, DelegatorPoolBalanceMetadata, PoolBalanceMetadata};
use crate::{
models::default_models::move_tables::TableItem,
db::common::models::default_models::move_tables::TableItem,
schema::{current_delegator_balances, delegator_balances},
utils::{database::PgPoolConnection, util::standardize_address},
utils::{database::DbPoolConnection, util::standardize_address},
};
use ahash::AHashMap;
use anyhow::Context;
Expand Down Expand Up @@ -133,7 +133,7 @@ impl CurrentDelegatorBalance {
write_set_change_index: i64,
inactive_pool_to_staking_pool: &ShareToStakingPoolMapping,
inactive_share_to_pool: &ShareToPoolMapping,
conn: &mut PgPoolConnection<'_>,
conn: &mut DbPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<Option<(DelegatorBalance, Self)>> {
Expand Down Expand Up @@ -257,7 +257,7 @@ impl CurrentDelegatorBalance {
write_set_change_index: i64,
inactive_pool_to_staking_pool: &ShareToStakingPoolMapping,
inactive_share_to_pool: &ShareToPoolMapping,
conn: &mut PgPoolConnection<'_>,
conn: &mut DbPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<Option<(DelegatorBalance, Self)>> {
Expand Down Expand Up @@ -369,7 +369,7 @@ impl CurrentDelegatorBalance {
}

pub async fn get_staking_pool_from_inactive_share_handle(
conn: &mut PgPoolConnection<'_>,
conn: &mut DbPoolConnection<'_>,
table_handle: &str,
query_retries: u32,
query_retry_delay_ms: u64,
Expand Down Expand Up @@ -397,7 +397,7 @@ impl CurrentDelegatorBalance {
pub async fn from_transaction(
transaction: &Transaction,
active_pool_to_staking_pool: &ShareToStakingPoolMapping,
conn: &mut PgPoolConnection<'_>,
conn: &mut DbPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<(Vec<DelegatorBalance>, CurrentDelegatorBalanceMap)> {
Expand Down Expand Up @@ -503,7 +503,7 @@ impl CurrentDelegatorBalance {

impl CurrentDelegatorBalanceQuery {
pub async fn get_by_inactive_share_handle(
conn: &mut PgPoolConnection<'_>,
conn: &mut DbPoolConnection<'_>,
table_handle: &str,
) -> diesel::QueryResult<Self> {
current_delegator_balances::table
Expand Down
Loading

0 comments on commit 3b02d57

Please sign in to comment.