diff --git a/rust/processor/src/db/common/models/mod.rs b/rust/processor/src/db/common/models/mod.rs index 3e0c98d42..6cf3e7533 100644 --- a/rust/processor/src/db/common/models/mod.rs +++ b/rust/processor/src/db/common/models/mod.rs @@ -4,4 +4,5 @@ pub mod default_models; pub mod event_models; pub mod fungible_asset_models; pub mod object_models; +pub mod stake_models; pub mod token_v2_models; diff --git a/rust/processor/src/db/postgres/models/stake_models/current_delegated_voter.rs b/rust/processor/src/db/common/models/stake_models/current_delegated_voter.rs similarity index 96% rename from rust/processor/src/db/postgres/models/stake_models/current_delegated_voter.rs rename to rust/processor/src/db/common/models/stake_models/current_delegated_voter.rs index c04343df1..261da4dfe 100644 --- a/rust/processor/src/db/postgres/models/stake_models/current_delegated_voter.rs +++ b/rust/processor/src/db/common/models/stake_models/current_delegated_voter.rs @@ -4,11 +4,11 @@ // This is required because a diesel macro makes clippy sad #![allow(clippy::extra_unused_lifetimes)] -use super::{ - delegator_balances::{CurrentDelegatorBalance, ShareToStakingPoolMapping}, - stake_utils::VoteDelegationTableItem, -}; +use super::delegator_balances::ShareToRawStakingPoolMapping; use crate::{ + db::common::models::stake_models::{ + delegator_balances::RawCurrentDelegatorBalance, stake_utils::VoteDelegationTableItem, + }, schema::current_delegated_voter, utils::{database::DbPoolConnection, util::standardize_address}, }; @@ -135,14 +135,14 @@ impl CurrentDelegatedVoter { write_table_item: &WriteTableItem, txn_version: i64, txn_timestamp: chrono::NaiveDateTime, - active_pool_to_staking_pool: &ShareToStakingPoolMapping, + active_pool_to_staking_pool: &ShareToRawStakingPoolMapping, previous_delegated_voters: &CurrentDelegatedVoterMap, conn: &mut DbPoolConnection<'_>, query_retries: u32, query_retry_delay_ms: u64, ) -> anyhow::Result> { if let Some((_, active_balance)) = - CurrentDelegatorBalance::get_active_share_from_write_table_item( + RawCurrentDelegatorBalance::get_active_share_from_write_table_item( write_table_item, txn_version, 0, // placeholder diff --git a/rust/processor/src/db/common/models/stake_models/delegator_activities.rs b/rust/processor/src/db/common/models/stake_models/delegator_activities.rs new file mode 100644 index 000000000..ef81b2bd1 --- /dev/null +++ b/rust/processor/src/db/common/models/stake_models/delegator_activities.rs @@ -0,0 +1,115 @@ +// Copyright © Aptos Foundation + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] + +use crate::{ + db::common::models::stake_models::stake_utils::StakeEvent, + utils::{ + counters::PROCESSOR_UNKNOWN_TYPE_COUNT, + util::{parse_timestamp, standardize_address, u64_to_bigdecimal}, + }, +}; +use aptos_protos::transaction::v1::{transaction::TxnData, Transaction}; +use bigdecimal::BigDecimal; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RawDelegatedStakingActivity { + pub transaction_version: i64, + pub event_index: i64, + pub delegator_address: String, + pub pool_address: String, + pub event_type: String, + pub amount: BigDecimal, + pub block_timestamp: chrono::NaiveDateTime, +} +pub trait RawDelegatedStakingActivityConvertible { + fn from_raw(raw: RawDelegatedStakingActivity) -> Self; +} + +impl RawDelegatedStakingActivity { + /// Pretty straightforward parsing from known delegated staking events + pub fn from_transaction(transaction: &Transaction) -> anyhow::Result> { + let mut delegator_activities = vec![]; + let txn_data = match transaction.txn_data.as_ref() { + Some(data) => data, + None => { + PROCESSOR_UNKNOWN_TYPE_COUNT + .with_label_values(&["DelegatedStakingActivity"]) + .inc(); + tracing::warn!( + transaction_version = transaction.version, + "Transaction data doesn't exist", + ); + return Ok(delegator_activities); + }, + }; + + let txn_version = transaction.version as i64; + let events = match txn_data { + TxnData::User(txn) => &txn.events, + TxnData::BlockMetadata(txn) => &txn.events, + TxnData::Validator(txn) => &txn.events, + _ => return Ok(delegator_activities), + }; + let block_timestamp = parse_timestamp(transaction.timestamp.as_ref().unwrap(), txn_version); + for (index, event) in events.iter().enumerate() { + let event_index = index as i64; + if let Some(staking_event) = + StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)? + { + let activity = match staking_event { + StakeEvent::AddStakeEvent(inner) => RawDelegatedStakingActivity { + transaction_version: txn_version, + event_index, + delegator_address: standardize_address(&inner.delegator_address), + pool_address: standardize_address(&inner.pool_address), + event_type: event.type_str.clone(), + amount: u64_to_bigdecimal(inner.amount_added), + block_timestamp, + }, + StakeEvent::UnlockStakeEvent(inner) => RawDelegatedStakingActivity { + transaction_version: txn_version, + event_index, + delegator_address: standardize_address(&inner.delegator_address), + pool_address: standardize_address(&inner.pool_address), + event_type: event.type_str.clone(), + amount: u64_to_bigdecimal(inner.amount_unlocked), + block_timestamp, + }, + StakeEvent::WithdrawStakeEvent(inner) => RawDelegatedStakingActivity { + transaction_version: txn_version, + event_index, + delegator_address: standardize_address(&inner.delegator_address), + pool_address: standardize_address(&inner.pool_address), + event_type: event.type_str.clone(), + amount: u64_to_bigdecimal(inner.amount_withdrawn), + block_timestamp, + }, + StakeEvent::ReactivateStakeEvent(inner) => RawDelegatedStakingActivity { + transaction_version: txn_version, + event_index, + delegator_address: standardize_address(&inner.delegator_address), + pool_address: standardize_address(&inner.pool_address), + event_type: event.type_str.clone(), + amount: u64_to_bigdecimal(inner.amount_reactivated), + block_timestamp, + }, + StakeEvent::DistributeRewardsEvent(inner) => RawDelegatedStakingActivity { + transaction_version: txn_version, + event_index, + delegator_address: "".to_string(), + pool_address: standardize_address(&inner.pool_address), + event_type: event.type_str.clone(), + amount: u64_to_bigdecimal(inner.rewards_amount), + block_timestamp, + }, + _ => continue, + }; + delegator_activities.push(activity); + } + } + Ok(delegator_activities) + } +} diff --git a/rust/processor/src/db/common/models/stake_models/delegator_balances.rs b/rust/processor/src/db/common/models/stake_models/delegator_balances.rs new file mode 100644 index 000000000..c625a9349 --- /dev/null +++ b/rust/processor/src/db/common/models/stake_models/delegator_balances.rs @@ -0,0 +1,558 @@ +// Copyright © Aptos Foundation + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +use crate::{ + db::{ + common::models::{ + default_models::raw_table_items::RawTableItem, + stake_models::delegator_pools::{ + DelegatorPool, RawDelegatorPoolBalanceMetadata, RawPoolBalanceMetadata, + }, + }, + postgres::models::default_models::move_tables::TableItem, + }, + schema::current_delegator_balances, + utils::{ + database::DbPoolConnection, + util::{parse_timestamp, standardize_address}, + }, +}; +use ahash::AHashMap; +use anyhow::Context; +use aptos_protos::transaction::v1::{ + write_set_change::Change, DeleteTableItem, Transaction, WriteResource, WriteTableItem, +}; +use bigdecimal::{BigDecimal, Zero}; +use chrono::NaiveDateTime; +use diesel::prelude::*; +use diesel_async::RunQueryDsl; +use serde::{Deserialize, Serialize}; + +pub type TableHandle = String; +pub type Address = String; +pub type ShareToRawStakingPoolMapping = AHashMap; +pub type ShareToRawPoolMapping = AHashMap; +pub type RawCurrentDelegatorBalancePK = (Address, Address, String); +pub type RawCurrentDelegatorBalanceMap = + AHashMap; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RawCurrentDelegatorBalance { + pub delegator_address: String, + pub pool_address: String, + pub pool_type: String, + pub table_handle: String, + pub last_transaction_version: i64, + pub shares: BigDecimal, + pub parent_table_handle: String, + pub block_timestamp: chrono::NaiveDateTime, +} + +pub trait RawCurrentDelegatorBalanceConvertible { + fn from_raw(raw: RawCurrentDelegatorBalance) -> Self; +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RawDelegatorBalance { + pub transaction_version: i64, + pub write_set_change_index: i64, + pub delegator_address: String, + pub pool_address: String, + pub pool_type: String, + pub table_handle: String, + pub shares: BigDecimal, + pub parent_table_handle: String, + pub block_timestamp: chrono::NaiveDateTime, +} + +pub trait RawDelegatorBalanceConvertible { + fn from_raw(raw: RawDelegatorBalance) -> Self; +} + +#[derive(Debug, Identifiable, Queryable)] +#[diesel(primary_key(delegator_address, pool_address, pool_type))] +#[diesel(table_name = current_delegator_balances)] +pub struct CurrentDelegatorBalanceQuery { + pub delegator_address: String, + pub pool_address: String, + pub pool_type: String, + pub table_handle: String, + pub last_transaction_version: i64, + pub inserted_at: chrono::NaiveDateTime, + pub shares: BigDecimal, + pub parent_table_handle: String, +} + +impl RawCurrentDelegatorBalance { + /// Getting active share balances. Only 1 active pool per staking pool tracked in a single table + pub async fn get_active_share_from_write_table_item( + write_table_item: &WriteTableItem, + txn_version: i64, + write_set_change_index: i64, + active_pool_to_staking_pool: &ShareToRawStakingPoolMapping, + block_timestamp: NaiveDateTime, + ) -> anyhow::Result> { + let table_handle = standardize_address(&write_table_item.handle.to_string()); + // The mapping will tell us if the table item is an active share table + if let Some(pool_balance) = active_pool_to_staking_pool.get(&table_handle) { + let pool_address = pool_balance.staking_pool_address.clone(); + let delegator_address = standardize_address(&write_table_item.key.to_string()); + + // Convert to TableItem model. Some fields are just placeholders + let table_item_model: TableItem = RawTableItem::postgres_table_item_from_write_item( + write_table_item, + 0, + txn_version, + 0, + block_timestamp, + ); + + let shares: BigDecimal = table_item_model + .decoded_value + .as_ref() + .unwrap() + .as_str() + .unwrap() + .parse::() + .context(format!( + "cannot parse string as u128: {:?}, version {}", + table_item_model.decoded_value.as_ref(), + txn_version + ))?; + let shares = shares / &pool_balance.scaling_factor; + Ok(Some(( + RawDelegatorBalance { + transaction_version: txn_version, + write_set_change_index, + delegator_address: delegator_address.clone(), + pool_address: pool_address.clone(), + pool_type: "active_shares".to_string(), + table_handle: table_handle.clone(), + shares: shares.clone(), + parent_table_handle: table_handle.clone(), + block_timestamp, + }, + Self { + delegator_address, + pool_address, + pool_type: "active_shares".to_string(), + table_handle: table_handle.clone(), + last_transaction_version: txn_version, + shares, + parent_table_handle: table_handle, + block_timestamp, + }, + ))) + } else { + Ok(None) + } + } + + /// Getting inactive share balances. There could be multiple inactive pool per staking pool so we have + /// 2 layers of mapping (table w/ all inactive pools -> staking pool, table w/ delegator inactive shares -> each inactive pool) + pub async fn get_inactive_share_from_write_table_item( + write_table_item: &WriteTableItem, + txn_version: i64, + write_set_change_index: i64, + inactive_pool_to_staking_pool: &ShareToRawStakingPoolMapping, + inactive_share_to_pool: &ShareToRawPoolMapping, + conn: &mut DbPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, + block_timestamp: chrono::NaiveDateTime, + ) -> anyhow::Result> { + let table_handle = standardize_address(&write_table_item.handle.to_string()); + // The mapping will tell us if the table item belongs to an inactive pool + if let Some(pool_balance) = inactive_share_to_pool.get(&table_handle) { + // If it is, we need to get the inactive staking pool handle and use it to look up the staking pool + let inactive_pool_handle = pool_balance.parent_table_handle.clone(); + + let pool_address = match inactive_pool_to_staking_pool + .get(&inactive_pool_handle) + .map(|metadata| metadata.staking_pool_address.clone()) + { + Some(pool_address) => pool_address, + None => { + match Self::get_staking_pool_from_inactive_share_handle( + conn, + &inactive_pool_handle, + query_retries, + query_retry_delay_ms, + ) + .await + { + Ok(pool) => pool, + Err(_) => { + tracing::error!( + transaction_version = txn_version, + lookup_key = &inactive_pool_handle, + "Failed to get staking pool address from inactive share handle. You probably should backfill db.", + ); + return Ok(None); + }, + } + }, + }; + let delegator_address = standardize_address(&write_table_item.key.to_string()); + // Convert to TableItem model. Some fields are just placeholders + let table_item_model = RawTableItem::postgres_table_item_from_write_item( + write_table_item, + 0, + txn_version, + 0, + block_timestamp, + ); + + let shares: BigDecimal = table_item_model + .decoded_value + .as_ref() + .unwrap() + .as_str() + .unwrap() + .parse::() + .context(format!( + "cannot parse string as u128: {:?}, version {}", + table_item_model.decoded_value.as_ref(), + txn_version + ))?; + let shares = shares / &pool_balance.scaling_factor; + Ok(Some(( + RawDelegatorBalance { + transaction_version: txn_version, + write_set_change_index, + delegator_address: delegator_address.clone(), + pool_address: pool_address.clone(), + pool_type: "inactive_shares".to_string(), + table_handle: table_handle.clone(), + shares: shares.clone(), + parent_table_handle: inactive_pool_handle.clone(), + block_timestamp, + }, + Self { + delegator_address, + pool_address, + pool_type: "inactive_shares".to_string(), + table_handle: table_handle.clone(), + last_transaction_version: txn_version, + shares, + parent_table_handle: inactive_pool_handle, + block_timestamp, + }, + ))) + } else { + Ok(None) + } + } + + // Setting amount to 0 if table item is deleted + pub fn get_active_share_from_delete_table_item( + delete_table_item: &DeleteTableItem, + txn_version: i64, + write_set_change_index: i64, + active_pool_to_staking_pool: &ShareToRawStakingPoolMapping, + block_timestamp: chrono::NaiveDateTime, + ) -> anyhow::Result> { + let table_handle = standardize_address(&delete_table_item.handle.to_string()); + // The mapping will tell us if the table item is an active share table + + if let Some(pool_balance) = active_pool_to_staking_pool.get(&table_handle) { + let delegator_address = standardize_address(&delete_table_item.key.to_string()); + + return Ok(Some(( + RawDelegatorBalance { + transaction_version: txn_version, + write_set_change_index, + delegator_address: delegator_address.clone(), + pool_address: pool_balance.staking_pool_address.clone(), + pool_type: "active_shares".to_string(), + table_handle: table_handle.clone(), + shares: BigDecimal::zero(), + parent_table_handle: table_handle.clone(), + block_timestamp, + }, + Self { + delegator_address, + pool_address: pool_balance.staking_pool_address.clone(), + pool_type: "active_shares".to_string(), + table_handle: table_handle.clone(), + last_transaction_version: txn_version, + shares: BigDecimal::zero(), + parent_table_handle: table_handle, + block_timestamp, + }, + ))); + } + Ok(None) + } + + // Setting amount to 0 if table item is deleted + pub async fn get_inactive_share_from_delete_table_item( + delete_table_item: &DeleteTableItem, + txn_version: i64, + write_set_change_index: i64, + inactive_pool_to_staking_pool: &ShareToRawStakingPoolMapping, + inactive_share_to_pool: &ShareToRawPoolMapping, + conn: &mut DbPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, + block_timestamp: chrono::NaiveDateTime, + ) -> anyhow::Result> { + let table_handle = standardize_address(&delete_table_item.handle.to_string()); + // The mapping will tell us if the table item belongs to an inactive pool + if let Some(pool_balance) = inactive_share_to_pool.get(&table_handle) { + // If it is, we need to get the inactive staking pool handle and use it to look up the staking pool + let inactive_pool_handle = pool_balance.parent_table_handle.clone(); + + let pool_address = match inactive_pool_to_staking_pool + .get(&inactive_pool_handle) + .map(|metadata| metadata.staking_pool_address.clone()) + { + Some(pool_address) => pool_address, + None => Self::get_staking_pool_from_inactive_share_handle( + conn, + &inactive_pool_handle, + query_retries, + query_retry_delay_ms, + ) + .await + .context(format!( + "Failed to get staking pool from inactive share handle {}, txn version {}", + inactive_pool_handle, txn_version + ))?, + }; + let delegator_address = standardize_address(&delete_table_item.key.to_string()); + + return Ok(Some(( + RawDelegatorBalance { + transaction_version: txn_version, + write_set_change_index, + delegator_address: delegator_address.clone(), + pool_address: pool_address.clone(), + pool_type: "inactive_shares".to_string(), + table_handle: table_handle.clone(), + shares: BigDecimal::zero(), + parent_table_handle: inactive_pool_handle.clone(), + block_timestamp, + }, + Self { + delegator_address, + pool_address, + pool_type: "inactive_shares".to_string(), + table_handle: table_handle.clone(), + last_transaction_version: txn_version, + shares: BigDecimal::zero(), + parent_table_handle: table_handle, + block_timestamp, + }, + ))); + } + Ok(None) + } + + /// Key is the inactive share table handle obtained from 0x1::delegation_pool::DelegationPool + /// Value is the same metadata although it's not really used + pub fn get_active_pool_to_staking_pool_mapping( + write_resource: &WriteResource, + txn_version: i64, + ) -> anyhow::Result> { + if let Some(balance) = DelegatorPool::get_delegated_pool_metadata_from_write_resource( + write_resource, + txn_version, + )? { + Ok(Some(AHashMap::from([( + balance.active_share_table_handle.clone(), + balance, + )]))) + } else { + Ok(None) + } + } + + /// Key is the inactive share table handle obtained from 0x1::delegation_pool::DelegationPool + /// Value is the same metadata although it's not really used + pub fn get_inactive_pool_to_staking_pool_mapping( + write_resource: &WriteResource, + txn_version: i64, + ) -> anyhow::Result> { + if let Some(balance) = DelegatorPool::get_delegated_pool_metadata_from_write_resource( + write_resource, + txn_version, + )? { + Ok(Some(AHashMap::from([( + balance.inactive_share_table_handle.clone(), + balance, + )]))) + } else { + Ok(None) + } + } + + /// Key is the inactive share table handle obtained from 0x1::pool_u64_unbound::Pool + /// Value is the 0x1::pool_u64_unbound::Pool metadata that will be used to populate a user's inactive balance + pub fn get_inactive_share_to_pool_mapping( + write_table_item: &WriteTableItem, + txn_version: i64, + ) -> anyhow::Result> { + if let Some(balance) = DelegatorPool::get_inactive_pool_metadata_from_write_table_item( + write_table_item, + txn_version, + )? { + Ok(Some(AHashMap::from([( + balance.shares_table_handle.clone(), + balance, + )]))) + } else { + Ok(None) + } + } + + pub async fn get_staking_pool_from_inactive_share_handle( + conn: &mut DbPoolConnection<'_>, + table_handle: &str, + query_retries: u32, + query_retry_delay_ms: u64, + ) -> anyhow::Result { + let mut tried = 0; + while tried < query_retries { + tried += 1; + match CurrentDelegatorBalanceQuery::get_by_inactive_share_handle(conn, table_handle) + .await + { + Ok(current_delegator_balance) => return Ok(current_delegator_balance.pool_address), + Err(_) => { + if tried < query_retries { + tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms)) + .await; + } + }, + } + } + Err(anyhow::anyhow!( + "Failed to get staking pool address from inactive share handle" + )) + } + + pub async fn from_transaction( + transaction: &Transaction, + active_pool_to_staking_pool: &ShareToRawStakingPoolMapping, + conn: &mut DbPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, + ) -> anyhow::Result<(Vec, RawCurrentDelegatorBalanceMap)> { + let mut inactive_pool_to_staking_pool: ShareToRawStakingPoolMapping = AHashMap::new(); + let mut inactive_share_to_pool: ShareToRawPoolMapping = AHashMap::new(); + let mut current_delegator_balances: RawCurrentDelegatorBalanceMap = AHashMap::new(); + let mut delegator_balances = vec![]; + let txn_version = transaction.version as i64; + let txn_timestamp = parse_timestamp(transaction.timestamp.as_ref().unwrap(), txn_version); + + let changes = &transaction.info.as_ref().unwrap().changes; + // Do a first pass to get the mapping of active_share table handles to staking pool resource let txn_version = transaction.version as i64; + for wsc in changes { + if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { + if let Some(map) = + Self::get_inactive_pool_to_staking_pool_mapping(write_resource, txn_version) + .unwrap() + { + inactive_pool_to_staking_pool.extend(map); + } + } + + if let Change::WriteTableItem(table_item) = wsc.change.as_ref().unwrap() { + if let Some(map) = + Self::get_inactive_share_to_pool_mapping(table_item, txn_version).unwrap() + { + inactive_share_to_pool.extend(map); + } + } + } + // Now make a pass through table items to get the actual delegator balances + for (index, wsc) in changes.iter().enumerate() { + let maybe_delegator_balance = match wsc.change.as_ref().unwrap() { + Change::DeleteTableItem(table_item) => { + if let Some((balance, current_balance)) = + Self::get_active_share_from_delete_table_item( + table_item, + txn_version, + index as i64, + active_pool_to_staking_pool, + txn_timestamp, + ) + .unwrap() + { + Some((balance, current_balance)) + } else { + Self::get_inactive_share_from_delete_table_item( + table_item, + txn_version, + index as i64, + &inactive_pool_to_staking_pool, + &inactive_share_to_pool, + conn, + query_retries, + query_retry_delay_ms, + txn_timestamp, + ) + .await + .unwrap() + } + }, + Change::WriteTableItem(table_item) => { + if let Some((balance, current_balance)) = + Self::get_active_share_from_write_table_item( + table_item, + txn_version, + index as i64, + active_pool_to_staking_pool, + txn_timestamp, + ) + .await + .unwrap() + { + Some((balance, current_balance)) + } else { + Self::get_inactive_share_from_write_table_item( + table_item, + txn_version, + index as i64, + &inactive_pool_to_staking_pool, + &inactive_share_to_pool, + conn, + query_retries, + query_retry_delay_ms, + txn_timestamp, + ) + .await + .unwrap() + } + }, + _ => None, + }; + if let Some((delegator_balance, current_delegator_balance)) = maybe_delegator_balance { + delegator_balances.push(delegator_balance); + current_delegator_balances.insert( + ( + current_delegator_balance.delegator_address.clone(), + current_delegator_balance.pool_address.clone(), + current_delegator_balance.pool_type.clone(), + ), + current_delegator_balance, + ); + } + } + Ok((delegator_balances, current_delegator_balances)) + } +} + +impl CurrentDelegatorBalanceQuery { + pub async fn get_by_inactive_share_handle( + conn: &mut DbPoolConnection<'_>, + table_handle: &str, + ) -> diesel::QueryResult { + current_delegator_balances::table + .filter(current_delegator_balances::parent_table_handle.eq(table_handle)) + .first::(conn) + .await + } +} diff --git a/rust/processor/src/db/common/models/stake_models/delegator_pools.rs b/rust/processor/src/db/common/models/stake_models/delegator_pools.rs new file mode 100644 index 000000000..ff7474e32 --- /dev/null +++ b/rust/processor/src/db/common/models/stake_models/delegator_pools.rs @@ -0,0 +1,258 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] + +use super::stake_utils::{StakeResource, StakeTableItem}; +use crate::{ + schema::{ + current_delegated_staking_pool_balances, delegated_staking_pool_balances, + delegated_staking_pools, + }, + utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address}, +}; +use ahash::AHashMap; +use aptos_protos::transaction::v1::{ + transaction::TxnData, write_set_change::Change, Transaction, WriteResource, WriteTableItem, +}; +use bigdecimal::BigDecimal; +use field_count::FieldCount; +use serde::{Deserialize, Serialize}; + +type StakingPoolAddress = String; +pub type DelegatorPoolMap = AHashMap; +pub type DelegatorPoolBalanceMap = AHashMap; + +// All pools +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[diesel(primary_key(staking_pool_address))] +#[diesel(table_name = delegated_staking_pools)] +pub struct DelegatorPool { + pub staking_pool_address: String, + pub first_transaction_version: i64, +} + +// Metadata to fill pool balances and delegator balance +#[derive(Debug, Deserialize, Serialize)] +pub struct RawDelegatorPoolBalanceMetadata { + pub transaction_version: i64, + pub staking_pool_address: String, + pub total_coins: BigDecimal, + pub total_shares: BigDecimal, + pub scaling_factor: BigDecimal, + pub operator_commission_percentage: BigDecimal, + pub active_share_table_handle: String, + pub inactive_share_table_handle: String, +} + +pub trait RawDelegatorPoolBalanceMetadataConvertible { + fn from_raw(raw: RawDelegatorPoolBalanceMetadata) -> Self; +} + +// Similar metadata but specifically for 0x1::pool_u64_unbound::Pool +#[derive(Debug, Deserialize, Serialize)] +pub struct RawPoolBalanceMetadata { + pub transaction_version: i64, + pub total_coins: BigDecimal, + pub total_shares: BigDecimal, + pub scaling_factor: BigDecimal, + pub shares_table_handle: String, + pub parent_table_handle: String, +} +pub trait RawPoolBalanceMetadataConvertible { + fn from_raw(raw: RawPoolBalanceMetadata) -> Self; +} + +// Pools balances +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[diesel(primary_key(transaction_version, staking_pool_address))] +#[diesel(table_name = delegated_staking_pool_balances)] +pub struct RawDelegatorPoolBalance { + pub transaction_version: i64, + pub staking_pool_address: String, + pub total_coins: BigDecimal, + pub total_shares: BigDecimal, + pub operator_commission_percentage: BigDecimal, + pub inactive_table_handle: String, + pub active_table_handle: String, +} +pub trait RawDelegatorPoolBalanceConvertible { + fn from_raw(raw: RawDelegatorPoolBalance) -> Self; +} + +// All pools w latest balances (really a more comprehensive version than DelegatorPool) +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[diesel(primary_key(staking_pool_address))] +#[diesel(table_name = current_delegated_staking_pool_balances)] +pub struct RawCurrentDelegatorPoolBalance { + pub staking_pool_address: String, + pub total_coins: BigDecimal, + pub total_shares: BigDecimal, + pub last_transaction_version: i64, + pub operator_commission_percentage: BigDecimal, + pub inactive_table_handle: String, + pub active_table_handle: String, +} + +pub trait RawCurrentDelegatorPoolBalanceConvertible { + fn from_raw(raw: RawCurrentDelegatorPoolBalance) -> Self; +} + +impl DelegatorPool { + pub fn from_transaction( + transaction: &Transaction, + ) -> anyhow::Result<( + DelegatorPoolMap, + Vec, + DelegatorPoolBalanceMap, + )> { + let mut delegator_pool_map = AHashMap::new(); + let mut delegator_pool_balances = vec![]; + let mut delegator_pool_balances_map = AHashMap::new(); + let txn_data = match transaction.txn_data.as_ref() { + Some(data) => data, + None => { + PROCESSOR_UNKNOWN_TYPE_COUNT + .with_label_values(&["DelegatorPool"]) + .inc(); + tracing::warn!( + transaction_version = transaction.version, + "Transaction data doesn't exist", + ); + return Ok(( + delegator_pool_map, + delegator_pool_balances, + delegator_pool_balances_map, + )); + }, + }; + let txn_version = transaction.version as i64; + + // Do a first pass to get the mapping of active_share table handles to staking pool addresses + if let TxnData::User(_) = txn_data { + let changes = &transaction + .info + .as_ref() + .expect("Transaction info doesn't exist!") + .changes; + for wsc in changes { + if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { + let maybe_write_resource = + Self::from_write_resource(write_resource, txn_version)?; + if let Some((pool, pool_balances, current_pool_balances)) = maybe_write_resource + { + let staking_pool_address = pool.staking_pool_address.clone(); + delegator_pool_map.insert(staking_pool_address.clone(), pool); + delegator_pool_balances.push(pool_balances); + delegator_pool_balances_map + .insert(staking_pool_address.clone(), current_pool_balances); + } + } + } + } + Ok(( + delegator_pool_map, + delegator_pool_balances, + delegator_pool_balances_map, + )) + } + + pub fn get_delegated_pool_metadata_from_write_resource( + write_resource: &WriteResource, + txn_version: i64, + ) -> anyhow::Result> { + if let Some(StakeResource::DelegationPool(inner)) = + StakeResource::from_write_resource(write_resource, txn_version)? + { + let staking_pool_address = standardize_address(&write_resource.address.to_string()); + let total_coins = inner.active_shares.total_coins; + let total_shares = + &inner.active_shares.total_shares / &inner.active_shares.scaling_factor; + Ok(Some(RawDelegatorPoolBalanceMetadata { + transaction_version: txn_version, + staking_pool_address, + total_coins, + total_shares, + scaling_factor: inner.active_shares.scaling_factor, + operator_commission_percentage: inner.operator_commission_percentage.clone(), + active_share_table_handle: inner.active_shares.shares.inner.get_handle(), + inactive_share_table_handle: inner.inactive_shares.get_handle(), + })) + } else { + Ok(None) + } + } + + pub fn get_inactive_pool_metadata_from_write_table_item( + write_table_item: &WriteTableItem, + txn_version: i64, + ) -> anyhow::Result> { + let table_item_data = write_table_item.data.as_ref().unwrap(); + + if let Some(StakeTableItem::Pool(inner)) = &StakeTableItem::from_table_item_type( + table_item_data.value_type.as_str(), + &table_item_data.value, + txn_version, + )? { + let total_coins = inner.total_coins.clone(); + let total_shares = &inner.total_shares / &inner.scaling_factor; + Ok(Some(RawPoolBalanceMetadata { + transaction_version: txn_version, + total_coins, + total_shares, + scaling_factor: inner.scaling_factor.clone(), + shares_table_handle: inner.shares.inner.get_handle(), + parent_table_handle: standardize_address(&write_table_item.handle.to_string()), + })) + } else { + Ok(None) + } + } + + pub fn from_write_resource( + write_resource: &WriteResource, + txn_version: i64, + ) -> anyhow::Result< + Option<( + Self, + RawDelegatorPoolBalance, + RawCurrentDelegatorPoolBalance, + )>, + > { + if let Some(balance) = + &Self::get_delegated_pool_metadata_from_write_resource(write_resource, txn_version)? + { + let staking_pool_address = balance.staking_pool_address.clone(); + let total_coins = balance.total_coins.clone(); + let total_shares = balance.total_shares.clone(); + let transaction_version = balance.transaction_version; + Ok(Some(( + Self { + staking_pool_address: staking_pool_address.clone(), + first_transaction_version: transaction_version, + }, + RawDelegatorPoolBalance { + transaction_version, + staking_pool_address: staking_pool_address.clone(), + total_coins: total_coins.clone(), + total_shares: total_shares.clone(), + operator_commission_percentage: balance.operator_commission_percentage.clone(), + inactive_table_handle: balance.inactive_share_table_handle.clone(), + active_table_handle: balance.active_share_table_handle.clone(), + }, + RawCurrentDelegatorPoolBalance { + staking_pool_address, + total_coins, + total_shares, + last_transaction_version: transaction_version, + operator_commission_percentage: balance.operator_commission_percentage.clone(), + inactive_table_handle: balance.inactive_share_table_handle.clone(), + active_table_handle: balance.active_share_table_handle.clone(), + }, + ))) + } else { + Ok(None) + } + } +} diff --git a/rust/processor/src/db/common/models/stake_models/mod.rs b/rust/processor/src/db/common/models/stake_models/mod.rs new file mode 100644 index 000000000..6dec6aad9 --- /dev/null +++ b/rust/processor/src/db/common/models/stake_models/mod.rs @@ -0,0 +1,7 @@ +pub mod current_delegated_voter; +pub mod delegator_activities; +pub mod delegator_balances; +pub mod delegator_pools; +pub mod proposal_voters; +pub mod stake_utils; +pub mod staking_pool_voter; diff --git a/rust/processor/src/db/common/models/stake_models/proposal_voters.rs b/rust/processor/src/db/common/models/stake_models/proposal_voters.rs new file mode 100644 index 000000000..3344d00a9 --- /dev/null +++ b/rust/processor/src/db/common/models/stake_models/proposal_voters.rs @@ -0,0 +1,76 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] + +use crate::{ + db::common::models::stake_models::stake_utils::StakeEvent, + schema::proposal_votes, + utils::{ + counters::PROCESSOR_UNKNOWN_TYPE_COUNT, + util::{parse_timestamp, standardize_address}, + }, +}; +use aptos_protos::transaction::v1::{transaction::TxnData, Transaction}; +use bigdecimal::BigDecimal; +use field_count::FieldCount; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[diesel(primary_key(transaction_version, proposal_id, voter_address))] +#[diesel(table_name = proposal_votes)] +pub struct RawProposalVote { + pub transaction_version: i64, + pub proposal_id: i64, + pub voter_address: String, + pub staking_pool_address: String, + pub num_votes: BigDecimal, + pub should_pass: bool, + pub transaction_timestamp: chrono::NaiveDateTime, +} +pub trait RawProposalVoteConvertible { + fn from_raw(raw: RawProposalVote) -> Self; +} + +impl RawProposalVote { + pub fn from_transaction(transaction: &Transaction) -> anyhow::Result> { + let mut proposal_votes = vec![]; + let txn_data = match transaction.txn_data.as_ref() { + Some(data) => data, + None => { + PROCESSOR_UNKNOWN_TYPE_COUNT + .with_label_values(&["ProposalVote"]) + .inc(); + tracing::warn!( + transaction_version = transaction.version, + "Transaction data doesn't exist", + ); + return Ok(proposal_votes); + }, + }; + let txn_version = transaction.version as i64; + + if let TxnData::User(user_txn) = txn_data { + for event in user_txn.events.iter() { + if let Some(StakeEvent::GovernanceVoteEvent(ev)) = + StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)? + { + proposal_votes.push(Self { + transaction_version: txn_version, + proposal_id: ev.proposal_id as i64, + voter_address: standardize_address(&ev.voter), + staking_pool_address: standardize_address(&ev.stake_pool), + num_votes: ev.num_votes.clone(), + should_pass: ev.should_pass, + transaction_timestamp: parse_timestamp( + transaction.timestamp.as_ref().unwrap(), + txn_version, + ), + }); + } + } + } + Ok(proposal_votes) + } +} diff --git a/rust/processor/src/db/postgres/models/stake_models/stake_utils.rs b/rust/processor/src/db/common/models/stake_models/stake_utils.rs similarity index 100% rename from rust/processor/src/db/postgres/models/stake_models/stake_utils.rs rename to rust/processor/src/db/common/models/stake_models/stake_utils.rs diff --git a/rust/processor/src/db/common/models/stake_models/staking_pool_voter.rs b/rust/processor/src/db/common/models/stake_models/staking_pool_voter.rs new file mode 100644 index 000000000..dce37dad8 --- /dev/null +++ b/rust/processor/src/db/common/models/stake_models/staking_pool_voter.rs @@ -0,0 +1,58 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] + +use crate::{ + db::common::models::stake_models::stake_utils::StakeResource, + utils::util::{parse_timestamp, standardize_address}, +}; +use ahash::AHashMap; +use aptos_protos::transaction::v1::{write_set_change::Change, Transaction}; +type StakingPoolAddress = String; +pub type StakingPoolRawVoterMap = AHashMap; + +pub struct RawCurrentStakingPoolVoter { + pub staking_pool_address: String, + pub voter_address: String, + pub last_transaction_version: i64, + pub operator_address: String, + pub block_timestamp: chrono::NaiveDateTime, +} + +pub trait RawCurrentStakingPoolVoterConvertible { + fn from_raw(raw: RawCurrentStakingPoolVoter) -> Self; +} + +impl RawCurrentStakingPoolVoter { + pub fn from_transaction(transaction: &Transaction) -> anyhow::Result { + let mut staking_pool_voters = AHashMap::new(); + + let txn_version = transaction.version as i64; + let timestamp = transaction + .timestamp + .as_ref() + .expect("Transaction timestamp doesn't exist!"); + let block_timestamp = parse_timestamp(timestamp, txn_version); + for wsc in &transaction.info.as_ref().unwrap().changes { + if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { + if let Some(StakeResource::StakePool(inner)) = + StakeResource::from_write_resource(write_resource, txn_version)? + { + let staking_pool_address = + standardize_address(&write_resource.address.to_string()); + staking_pool_voters.insert(staking_pool_address.clone(), Self { + staking_pool_address, + voter_address: inner.get_delegated_voter(), + last_transaction_version: txn_version, + operator_address: inner.get_operator_address(), + block_timestamp, + }); + } + } + } + + Ok(staking_pool_voters) + } +} diff --git a/rust/processor/src/db/parquet/models/mod.rs b/rust/processor/src/db/parquet/models/mod.rs index fb9742f2d..318613f27 100644 --- a/rust/processor/src/db/parquet/models/mod.rs +++ b/rust/processor/src/db/parquet/models/mod.rs @@ -4,8 +4,8 @@ pub mod default_models; pub mod event_models; pub mod fungible_asset_models; pub mod object_models; +pub mod stake_models; pub mod token_v2_models; pub mod transaction_metadata_model; pub mod user_transaction_models; - const DEFAULT_NONE: &str = "NULL"; diff --git a/rust/processor/src/db/parquet/models/stake_models/mod.rs b/rust/processor/src/db/parquet/models/stake_models/mod.rs new file mode 100644 index 000000000..eb8e06152 --- /dev/null +++ b/rust/processor/src/db/parquet/models/stake_models/mod.rs @@ -0,0 +1,3 @@ +pub mod parquet_delegator_activities; +pub mod parquet_delegator_balances; +pub mod parquet_proposal_voters; diff --git a/rust/processor/src/db/parquet/models/stake_models/parquet_delegator_activities.rs b/rust/processor/src/db/parquet/models/stake_models/parquet_delegator_activities.rs new file mode 100644 index 000000000..2a6bfd6ce --- /dev/null +++ b/rust/processor/src/db/parquet/models/stake_models/parquet_delegator_activities.rs @@ -0,0 +1,54 @@ +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::stake_models::delegator_activities::{ + RawDelegatedStakingActivity, RawDelegatedStakingActivityConvertible, + }, +}; +use allocative_derive::Allocative; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct DelegatedStakingActivity { + pub transaction_version: i64, + pub event_index: i64, + pub delegator_address: String, + pub pool_address: String, + pub event_type: String, + pub amount: String, // BigDecimal + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, +} + +impl HasVersion for DelegatedStakingActivity { + fn version(&self) -> i64 { + self.transaction_version + } +} + +impl NamedTable for DelegatedStakingActivity { + const TABLE_NAME: &'static str = "delegated_staking_activities"; +} + +impl GetTimeStamp for DelegatedStakingActivity { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + +impl RawDelegatedStakingActivityConvertible for DelegatedStakingActivity { + fn from_raw(raw: RawDelegatedStakingActivity) -> Self { + Self { + transaction_version: raw.transaction_version, + event_index: raw.event_index, + delegator_address: raw.delegator_address, + pool_address: raw.pool_address, + event_type: raw.event_type, + amount: raw.amount.to_string(), + block_timestamp: raw.block_timestamp, + } + } +} diff --git a/rust/processor/src/db/parquet/models/stake_models/parquet_delegator_balances.rs b/rust/processor/src/db/parquet/models/stake_models/parquet_delegator_balances.rs new file mode 100644 index 000000000..c05b52224 --- /dev/null +++ b/rust/processor/src/db/parquet/models/stake_models/parquet_delegator_balances.rs @@ -0,0 +1,105 @@ +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::stake_models::delegator_balances::{ + RawCurrentDelegatorBalance, RawCurrentDelegatorBalanceConvertible, RawDelegatorBalance, + RawDelegatorBalanceConvertible, + }, +}; +use allocative::Allocative; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct CurrentDelegatorBalance { + pub delegator_address: String, + pub pool_address: String, + pub pool_type: String, + pub table_handle: String, + pub last_transaction_version: i64, + pub shares: String, // BigDecimal + pub parent_table_handle: String, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, +} + +impl RawCurrentDelegatorBalanceConvertible for CurrentDelegatorBalance { + fn from_raw(raw: RawCurrentDelegatorBalance) -> Self { + Self { + delegator_address: raw.delegator_address, + pool_address: raw.pool_address, + pool_type: raw.pool_type, + table_handle: raw.table_handle, + last_transaction_version: raw.last_transaction_version, + shares: raw.shares.to_string(), + parent_table_handle: raw.parent_table_handle, + block_timestamp: raw.block_timestamp, + } + } +} + +impl HasVersion for CurrentDelegatorBalance { + fn version(&self) -> i64 { + self.last_transaction_version + } +} + +impl NamedTable for CurrentDelegatorBalance { + const TABLE_NAME: &'static str = "current_delegator_balances"; +} + +impl GetTimeStamp for CurrentDelegatorBalance { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct DelegatorBalance { + pub transaction_version: i64, + pub write_set_change_index: i64, + pub delegator_address: String, + pub pool_address: String, + pub pool_type: String, + pub table_handle: String, + pub shares: String, // BigDecimal + pub parent_table_handle: String, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, +} + +impl NamedTable for DelegatorBalance { + const TABLE_NAME: &'static str = "delegator_balances"; +} + +impl HasVersion for DelegatorBalance { + fn version(&self) -> i64 { + self.transaction_version + } +} + +impl GetTimeStamp for DelegatorBalance { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + +impl RawDelegatorBalanceConvertible for DelegatorBalance { + fn from_raw(raw: RawDelegatorBalance) -> Self { + Self { + transaction_version: raw.transaction_version, + write_set_change_index: raw.write_set_change_index, + delegator_address: raw.delegator_address, + pool_address: raw.pool_address, + pool_type: raw.pool_type, + table_handle: raw.table_handle, + shares: raw.shares.to_string(), + parent_table_handle: raw.parent_table_handle, + block_timestamp: raw.block_timestamp, + } + } +} diff --git a/rust/processor/src/db/parquet/models/stake_models/parquet_proposal_voters.rs b/rust/processor/src/db/parquet/models/stake_models/parquet_proposal_voters.rs new file mode 100644 index 000000000..9050137ed --- /dev/null +++ b/rust/processor/src/db/parquet/models/stake_models/parquet_proposal_voters.rs @@ -0,0 +1,54 @@ +use crate::{ + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::stake_models::proposal_voters::{ + RawProposalVote, RawProposalVoteConvertible, + }, +}; +use allocative_derive::Allocative; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +)] +pub struct ProposalVote { + pub transaction_version: i64, + pub proposal_id: i64, + pub voter_address: String, + pub staking_pool_address: String, + pub num_votes: String, // BigDecimal + pub should_pass: bool, + #[allocative(skip)] + pub transaction_timestamp: chrono::NaiveDateTime, +} + +impl NamedTable for ProposalVote { + const TABLE_NAME: &'static str = "proposal_votes"; +} + +impl HasVersion for ProposalVote { + fn version(&self) -> i64 { + self.transaction_version + } +} + +impl GetTimeStamp for ProposalVote { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.transaction_timestamp + } +} + +impl RawProposalVoteConvertible for ProposalVote { + fn from_raw(raw: RawProposalVote) -> Self { + Self { + transaction_version: raw.transaction_version, + proposal_id: raw.proposal_id, + voter_address: raw.voter_address, + staking_pool_address: raw.staking_pool_address, + num_votes: raw.num_votes.to_string(), + should_pass: raw.should_pass, + transaction_timestamp: raw.transaction_timestamp, + } + } +} diff --git a/rust/processor/src/db/postgres/models/stake_models/delegator_activities.rs b/rust/processor/src/db/postgres/models/stake_models/delegator_activities.rs index e087480bb..efe8f959e 100644 --- a/rust/processor/src/db/postgres/models/stake_models/delegator_activities.rs +++ b/rust/processor/src/db/postgres/models/stake_models/delegator_activities.rs @@ -3,15 +3,12 @@ // This is required because a diesel macro makes clippy sad #![allow(clippy::extra_unused_lifetimes)] -use super::stake_utils::StakeEvent; use crate::{ - schema::delegated_staking_activities, - utils::{ - counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - util::{standardize_address, u64_to_bigdecimal}, + db::common::models::stake_models::delegator_activities::{ + RawDelegatedStakingActivity, RawDelegatedStakingActivityConvertible, }, + schema::delegated_staking_activities, }; -use aptos_protos::transaction::v1::{transaction::TxnData, Transaction}; use bigdecimal::BigDecimal; use field_count::FieldCount; use serde::{Deserialize, Serialize}; @@ -28,82 +25,15 @@ pub struct DelegatedStakingActivity { pub amount: BigDecimal, } -impl DelegatedStakingActivity { - /// Pretty straightforward parsing from known delegated staking events - pub fn from_transaction(transaction: &Transaction) -> anyhow::Result> { - let mut delegator_activities = vec![]; - let txn_data = match transaction.txn_data.as_ref() { - Some(data) => data, - None => { - PROCESSOR_UNKNOWN_TYPE_COUNT - .with_label_values(&["DelegatedStakingActivity"]) - .inc(); - tracing::warn!( - transaction_version = transaction.version, - "Transaction data doesn't exist", - ); - return Ok(delegator_activities); - }, - }; - - let txn_version = transaction.version as i64; - let events = match txn_data { - TxnData::User(txn) => &txn.events, - TxnData::BlockMetadata(txn) => &txn.events, - TxnData::Validator(txn) => &txn.events, - _ => return Ok(delegator_activities), - }; - for (index, event) in events.iter().enumerate() { - let event_index = index as i64; - if let Some(staking_event) = - StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)? - { - let activity = match staking_event { - StakeEvent::AddStakeEvent(inner) => DelegatedStakingActivity { - transaction_version: txn_version, - event_index, - delegator_address: standardize_address(&inner.delegator_address), - pool_address: standardize_address(&inner.pool_address), - event_type: event.type_str.clone(), - amount: u64_to_bigdecimal(inner.amount_added), - }, - StakeEvent::UnlockStakeEvent(inner) => DelegatedStakingActivity { - transaction_version: txn_version, - event_index, - delegator_address: standardize_address(&inner.delegator_address), - pool_address: standardize_address(&inner.pool_address), - event_type: event.type_str.clone(), - amount: u64_to_bigdecimal(inner.amount_unlocked), - }, - StakeEvent::WithdrawStakeEvent(inner) => DelegatedStakingActivity { - transaction_version: txn_version, - event_index, - delegator_address: standardize_address(&inner.delegator_address), - pool_address: standardize_address(&inner.pool_address), - event_type: event.type_str.clone(), - amount: u64_to_bigdecimal(inner.amount_withdrawn), - }, - StakeEvent::ReactivateStakeEvent(inner) => DelegatedStakingActivity { - transaction_version: txn_version, - event_index, - delegator_address: standardize_address(&inner.delegator_address), - pool_address: standardize_address(&inner.pool_address), - event_type: event.type_str.clone(), - amount: u64_to_bigdecimal(inner.amount_reactivated), - }, - StakeEvent::DistributeRewardsEvent(inner) => DelegatedStakingActivity { - transaction_version: txn_version, - event_index, - delegator_address: "".to_string(), - pool_address: standardize_address(&inner.pool_address), - event_type: event.type_str.clone(), - amount: u64_to_bigdecimal(inner.rewards_amount), - }, - _ => continue, - }; - delegator_activities.push(activity); - } +impl RawDelegatedStakingActivityConvertible for DelegatedStakingActivity { + fn from_raw(raw: RawDelegatedStakingActivity) -> Self { + Self { + transaction_version: raw.transaction_version, + event_index: raw.event_index, + delegator_address: raw.delegator_address, + pool_address: raw.pool_address, + event_type: raw.event_type, + amount: raw.amount, } - Ok(delegator_activities) } } diff --git a/rust/processor/src/db/postgres/models/stake_models/delegator_balances.rs b/rust/processor/src/db/postgres/models/stake_models/delegator_balances.rs index d76feac0b..11d1406cf 100644 --- a/rust/processor/src/db/postgres/models/stake_models/delegator_balances.rs +++ b/rust/processor/src/db/postgres/models/stake_models/delegator_balances.rs @@ -2,35 +2,26 @@ // This is required because a diesel macro makes clippy sad #![allow(clippy::extra_unused_lifetimes)] - -use super::delegator_pools::{DelegatorPool, DelegatorPoolBalanceMetadata, PoolBalanceMetadata}; use crate::{ - db::{ - common::models::default_models::raw_table_items::RawTableItem, - postgres::models::default_models::move_tables::TableItem, + db::common::models::stake_models::{ + delegator_balances::{ + RawCurrentDelegatorBalance, RawCurrentDelegatorBalanceConvertible, RawDelegatorBalance, + RawDelegatorBalanceConvertible, + }, + delegator_pools::{RawDelegatorPoolBalanceMetadata, RawPoolBalanceMetadata}, }, schema::{current_delegator_balances, delegator_balances}, - utils::{ - database::DbPoolConnection, - util::{parse_timestamp, standardize_address}, - }, }; use ahash::AHashMap; -use anyhow::Context; -use aptos_protos::transaction::v1::{ - write_set_change::Change, DeleteTableItem, Transaction, WriteResource, WriteTableItem, -}; -use bigdecimal::{BigDecimal, Zero}; -use chrono::NaiveDateTime; +use bigdecimal::BigDecimal; use diesel::prelude::*; -use diesel_async::RunQueryDsl; use field_count::FieldCount; use serde::{Deserialize, Serialize}; pub type TableHandle = String; pub type Address = String; -pub type ShareToStakingPoolMapping = AHashMap; -pub type ShareToPoolMapping = AHashMap; +pub type ShareToStakingPoolMapping = AHashMap; +pub type ShareToPoolMapping = AHashMap; pub type CurrentDelegatorBalancePK = (Address, Address, String); pub type CurrentDelegatorBalanceMap = AHashMap; @@ -47,6 +38,20 @@ pub struct CurrentDelegatorBalance { pub parent_table_handle: String, } +impl RawCurrentDelegatorBalanceConvertible for CurrentDelegatorBalance { + fn from_raw(raw: RawCurrentDelegatorBalance) -> Self { + Self { + delegator_address: raw.delegator_address, + pool_address: raw.pool_address, + pool_type: raw.pool_type, + table_handle: raw.table_handle, + last_transaction_version: raw.last_transaction_version, + shares: raw.shares, + parent_table_handle: raw.parent_table_handle, + } + } +} + #[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(transaction_version, write_set_change_index))] #[diesel(table_name = delegator_balances)] @@ -61,476 +66,17 @@ pub struct DelegatorBalance { pub parent_table_handle: String, } -#[derive(Debug, Identifiable, Queryable)] -#[diesel(primary_key(delegator_address, pool_address, pool_type))] -#[diesel(table_name = current_delegator_balances)] -pub struct CurrentDelegatorBalanceQuery { - pub delegator_address: String, - pub pool_address: String, - pub pool_type: String, - pub table_handle: String, - pub last_transaction_version: i64, - pub inserted_at: chrono::NaiveDateTime, - pub shares: BigDecimal, - pub parent_table_handle: String, -} - -impl CurrentDelegatorBalance { - /// Getting active share balances. Only 1 active pool per staking pool tracked in a single table - pub async fn get_active_share_from_write_table_item( - write_table_item: &WriteTableItem, - txn_version: i64, - write_set_change_index: i64, - active_pool_to_staking_pool: &ShareToStakingPoolMapping, - block_timestamp: NaiveDateTime, - ) -> anyhow::Result> { - let table_handle = standardize_address(&write_table_item.handle.to_string()); - // The mapping will tell us if the table item is an active share table - if let Some(pool_balance) = active_pool_to_staking_pool.get(&table_handle) { - let pool_address = pool_balance.staking_pool_address.clone(); - let delegator_address = standardize_address(&write_table_item.key.to_string()); - - // Convert to TableItem model. Some fields are just placeholders - let table_item_model: TableItem = RawTableItem::postgres_table_item_from_write_item( - write_table_item, - 0, - txn_version, - 0, - block_timestamp, - ); - - let shares: BigDecimal = table_item_model - .decoded_value - .as_ref() - .unwrap() - .as_str() - .unwrap() - .parse::() - .context(format!( - "cannot parse string as u128: {:?}, version {}", - table_item_model.decoded_value.as_ref(), - txn_version - ))?; - let shares = shares / &pool_balance.scaling_factor; - Ok(Some(( - DelegatorBalance { - transaction_version: txn_version, - write_set_change_index, - delegator_address: delegator_address.clone(), - pool_address: pool_address.clone(), - pool_type: "active_shares".to_string(), - table_handle: table_handle.clone(), - shares: shares.clone(), - parent_table_handle: table_handle.clone(), - }, - Self { - delegator_address, - pool_address, - pool_type: "active_shares".to_string(), - table_handle: table_handle.clone(), - last_transaction_version: txn_version, - shares, - parent_table_handle: table_handle, - }, - ))) - } else { - Ok(None) - } - } - - /// Getting inactive share balances. There could be multiple inactive pool per staking pool so we have - /// 2 layers of mapping (table w/ all inactive pools -> staking pool, table w/ delegator inactive shares -> each inactive pool) - pub async fn get_inactive_share_from_write_table_item( - write_table_item: &WriteTableItem, - txn_version: i64, - write_set_change_index: i64, - inactive_pool_to_staking_pool: &ShareToStakingPoolMapping, - inactive_share_to_pool: &ShareToPoolMapping, - conn: &mut DbPoolConnection<'_>, - query_retries: u32, - query_retry_delay_ms: u64, - block_timestamp: chrono::NaiveDateTime, - ) -> anyhow::Result> { - let table_handle = standardize_address(&write_table_item.handle.to_string()); - // The mapping will tell us if the table item belongs to an inactive pool - if let Some(pool_balance) = inactive_share_to_pool.get(&table_handle) { - // If it is, we need to get the inactive staking pool handle and use it to look up the staking pool - let inactive_pool_handle = pool_balance.parent_table_handle.clone(); - - let pool_address = match inactive_pool_to_staking_pool - .get(&inactive_pool_handle) - .map(|metadata| metadata.staking_pool_address.clone()) - { - Some(pool_address) => pool_address, - None => { - match Self::get_staking_pool_from_inactive_share_handle( - conn, - &inactive_pool_handle, - query_retries, - query_retry_delay_ms, - ) - .await - { - Ok(pool) => pool, - Err(_) => { - tracing::error!( - transaction_version = txn_version, - lookup_key = &inactive_pool_handle, - "Failed to get staking pool address from inactive share handle. You probably should backfill db.", - ); - return Ok(None); - }, - } - }, - }; - let delegator_address = standardize_address(&write_table_item.key.to_string()); - // Convert to TableItem model. Some fields are just placeholders - let table_item_model = RawTableItem::postgres_table_item_from_write_item( - write_table_item, - 0, - txn_version, - 0, - block_timestamp, - ); - - let shares: BigDecimal = table_item_model - .decoded_value - .as_ref() - .unwrap() - .as_str() - .unwrap() - .parse::() - .context(format!( - "cannot parse string as u128: {:?}, version {}", - table_item_model.decoded_value.as_ref(), - txn_version - ))?; - let shares = shares / &pool_balance.scaling_factor; - Ok(Some(( - DelegatorBalance { - transaction_version: txn_version, - write_set_change_index, - delegator_address: delegator_address.clone(), - pool_address: pool_address.clone(), - pool_type: "inactive_shares".to_string(), - table_handle: table_handle.clone(), - shares: shares.clone(), - parent_table_handle: inactive_pool_handle.clone(), - }, - Self { - delegator_address, - pool_address, - pool_type: "inactive_shares".to_string(), - table_handle: table_handle.clone(), - last_transaction_version: txn_version, - shares, - parent_table_handle: inactive_pool_handle, - }, - ))) - } else { - Ok(None) +impl RawDelegatorBalanceConvertible for DelegatorBalance { + fn from_raw(raw: RawDelegatorBalance) -> Self { + Self { + transaction_version: raw.transaction_version, + write_set_change_index: raw.write_set_change_index, + delegator_address: raw.delegator_address, + pool_address: raw.pool_address, + pool_type: raw.pool_type, + table_handle: raw.table_handle, + shares: raw.shares, + parent_table_handle: raw.parent_table_handle, } } - - // Setting amount to 0 if table item is deleted - pub fn get_active_share_from_delete_table_item( - delete_table_item: &DeleteTableItem, - txn_version: i64, - write_set_change_index: i64, - active_pool_to_staking_pool: &ShareToStakingPoolMapping, - ) -> anyhow::Result> { - let table_handle = standardize_address(&delete_table_item.handle.to_string()); - // The mapping will tell us if the table item is an active share table - if let Some(pool_balance) = active_pool_to_staking_pool.get(&table_handle) { - let delegator_address = standardize_address(&delete_table_item.key.to_string()); - - return Ok(Some(( - DelegatorBalance { - transaction_version: txn_version, - write_set_change_index, - delegator_address: delegator_address.clone(), - pool_address: pool_balance.staking_pool_address.clone(), - pool_type: "active_shares".to_string(), - table_handle: table_handle.clone(), - shares: BigDecimal::zero(), - parent_table_handle: table_handle.clone(), - }, - Self { - delegator_address, - pool_address: pool_balance.staking_pool_address.clone(), - pool_type: "active_shares".to_string(), - table_handle: table_handle.clone(), - last_transaction_version: txn_version, - shares: BigDecimal::zero(), - parent_table_handle: table_handle, - }, - ))); - } - Ok(None) - } - - // Setting amount to 0 if table item is deleted - pub async fn get_inactive_share_from_delete_table_item( - delete_table_item: &DeleteTableItem, - txn_version: i64, - write_set_change_index: i64, - inactive_pool_to_staking_pool: &ShareToStakingPoolMapping, - inactive_share_to_pool: &ShareToPoolMapping, - conn: &mut DbPoolConnection<'_>, - query_retries: u32, - query_retry_delay_ms: u64, - ) -> anyhow::Result> { - let table_handle = standardize_address(&delete_table_item.handle.to_string()); - // The mapping will tell us if the table item belongs to an inactive pool - if let Some(pool_balance) = inactive_share_to_pool.get(&table_handle) { - // If it is, we need to get the inactive staking pool handle and use it to look up the staking pool - let inactive_pool_handle = pool_balance.parent_table_handle.clone(); - - let pool_address = match inactive_pool_to_staking_pool - .get(&inactive_pool_handle) - .map(|metadata| metadata.staking_pool_address.clone()) - { - Some(pool_address) => pool_address, - None => Self::get_staking_pool_from_inactive_share_handle( - conn, - &inactive_pool_handle, - query_retries, - query_retry_delay_ms, - ) - .await - .context(format!( - "Failed to get staking pool from inactive share handle {}, txn version {}", - inactive_pool_handle, txn_version - ))?, - }; - let delegator_address = standardize_address(&delete_table_item.key.to_string()); - - return Ok(Some(( - DelegatorBalance { - transaction_version: txn_version, - write_set_change_index, - delegator_address: delegator_address.clone(), - pool_address: pool_address.clone(), - pool_type: "inactive_shares".to_string(), - table_handle: table_handle.clone(), - shares: BigDecimal::zero(), - parent_table_handle: inactive_pool_handle.clone(), - }, - Self { - delegator_address, - pool_address, - pool_type: "inactive_shares".to_string(), - table_handle: table_handle.clone(), - last_transaction_version: txn_version, - shares: BigDecimal::zero(), - parent_table_handle: table_handle, - }, - ))); - } - Ok(None) - } - - /// Key is the inactive share table handle obtained from 0x1::delegation_pool::DelegationPool - /// Value is the same metadata although it's not really used - pub fn get_active_pool_to_staking_pool_mapping( - write_resource: &WriteResource, - txn_version: i64, - ) -> anyhow::Result> { - if let Some(balance) = DelegatorPool::get_delegated_pool_metadata_from_write_resource( - write_resource, - txn_version, - )? { - Ok(Some(AHashMap::from([( - balance.active_share_table_handle.clone(), - balance, - )]))) - } else { - Ok(None) - } - } - - /// Key is the inactive share table handle obtained from 0x1::delegation_pool::DelegationPool - /// Value is the same metadata although it's not really used - pub fn get_inactive_pool_to_staking_pool_mapping( - write_resource: &WriteResource, - txn_version: i64, - ) -> anyhow::Result> { - if let Some(balance) = DelegatorPool::get_delegated_pool_metadata_from_write_resource( - write_resource, - txn_version, - )? { - Ok(Some(AHashMap::from([( - balance.inactive_share_table_handle.clone(), - balance, - )]))) - } else { - Ok(None) - } - } - - /// Key is the inactive share table handle obtained from 0x1::pool_u64_unbound::Pool - /// Value is the 0x1::pool_u64_unbound::Pool metadata that will be used to populate a user's inactive balance - pub fn get_inactive_share_to_pool_mapping( - write_table_item: &WriteTableItem, - txn_version: i64, - ) -> anyhow::Result> { - if let Some(balance) = DelegatorPool::get_inactive_pool_metadata_from_write_table_item( - write_table_item, - txn_version, - )? { - Ok(Some(AHashMap::from([( - balance.shares_table_handle.clone(), - balance, - )]))) - } else { - Ok(None) - } - } - - pub async fn get_staking_pool_from_inactive_share_handle( - conn: &mut DbPoolConnection<'_>, - table_handle: &str, - query_retries: u32, - query_retry_delay_ms: u64, - ) -> anyhow::Result { - let mut tried = 0; - while tried < query_retries { - tried += 1; - match CurrentDelegatorBalanceQuery::get_by_inactive_share_handle(conn, table_handle) - .await - { - Ok(current_delegator_balance) => return Ok(current_delegator_balance.pool_address), - Err(_) => { - if tried < query_retries { - tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms)) - .await; - } - }, - } - } - Err(anyhow::anyhow!( - "Failed to get staking pool address from inactive share handle" - )) - } - - pub async fn from_transaction( - transaction: &Transaction, - active_pool_to_staking_pool: &ShareToStakingPoolMapping, - conn: &mut DbPoolConnection<'_>, - query_retries: u32, - query_retry_delay_ms: u64, - ) -> anyhow::Result<(Vec, CurrentDelegatorBalanceMap)> { - let mut inactive_pool_to_staking_pool: ShareToStakingPoolMapping = AHashMap::new(); - let mut inactive_share_to_pool: ShareToPoolMapping = AHashMap::new(); - let mut current_delegator_balances: CurrentDelegatorBalanceMap = AHashMap::new(); - let mut delegator_balances = vec![]; - let txn_version = transaction.version as i64; - let txn_timestamp = parse_timestamp(transaction.timestamp.as_ref().unwrap(), txn_version); - - let changes = &transaction.info.as_ref().unwrap().changes; - // Do a first pass to get the mapping of active_share table handles to staking pool resource let txn_version = transaction.version as i64; - for wsc in changes { - if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { - if let Some(map) = - Self::get_inactive_pool_to_staking_pool_mapping(write_resource, txn_version) - .unwrap() - { - inactive_pool_to_staking_pool.extend(map); - } - } - - if let Change::WriteTableItem(table_item) = wsc.change.as_ref().unwrap() { - if let Some(map) = - Self::get_inactive_share_to_pool_mapping(table_item, txn_version).unwrap() - { - inactive_share_to_pool.extend(map); - } - } - } - // Now make a pass through table items to get the actual delegator balances - for (index, wsc) in changes.iter().enumerate() { - let maybe_delegator_balance = match wsc.change.as_ref().unwrap() { - Change::DeleteTableItem(table_item) => { - if let Some((balance, current_balance)) = - Self::get_active_share_from_delete_table_item( - table_item, - txn_version, - index as i64, - active_pool_to_staking_pool, - ) - .unwrap() - { - Some((balance, current_balance)) - } else { - Self::get_inactive_share_from_delete_table_item( - table_item, - txn_version, - index as i64, - &inactive_pool_to_staking_pool, - &inactive_share_to_pool, - conn, - query_retries, - query_retry_delay_ms, - ) - .await - .unwrap() - } - }, - Change::WriteTableItem(table_item) => { - if let Some((balance, current_balance)) = - Self::get_active_share_from_write_table_item( - table_item, - txn_version, - index as i64, - active_pool_to_staking_pool, - txn_timestamp, - ) - .await - .unwrap() - { - Some((balance, current_balance)) - } else { - Self::get_inactive_share_from_write_table_item( - table_item, - txn_version, - index as i64, - &inactive_pool_to_staking_pool, - &inactive_share_to_pool, - conn, - query_retries, - query_retry_delay_ms, - txn_timestamp, - ) - .await - .unwrap() - } - }, - _ => None, - }; - if let Some((delegator_balance, current_delegator_balance)) = maybe_delegator_balance { - delegator_balances.push(delegator_balance); - current_delegator_balances.insert( - ( - current_delegator_balance.delegator_address.clone(), - current_delegator_balance.pool_address.clone(), - current_delegator_balance.pool_type.clone(), - ), - current_delegator_balance, - ); - } - } - Ok((delegator_balances, current_delegator_balances)) - } -} - -impl CurrentDelegatorBalanceQuery { - pub async fn get_by_inactive_share_handle( - conn: &mut DbPoolConnection<'_>, - table_handle: &str, - ) -> diesel::QueryResult { - current_delegator_balances::table - .filter(current_delegator_balances::parent_table_handle.eq(table_handle)) - .first::(conn) - .await - } } diff --git a/rust/processor/src/db/postgres/models/stake_models/delegator_pools.rs b/rust/processor/src/db/postgres/models/stake_models/delegator_pools.rs index 6c5613990..93623b499 100644 --- a/rust/processor/src/db/postgres/models/stake_models/delegator_pools.rs +++ b/rust/processor/src/db/postgres/models/stake_models/delegator_pools.rs @@ -4,34 +4,22 @@ // This is required because a diesel macro makes clippy sad #![allow(clippy::extra_unused_lifetimes)] -use super::stake_utils::{StakeResource, StakeTableItem}; use crate::{ - schema::{ - current_delegated_staking_pool_balances, delegated_staking_pool_balances, - delegated_staking_pools, + db::common::models::stake_models::delegator_pools::{ + DelegatorPool, RawCurrentDelegatorPoolBalance, RawCurrentDelegatorPoolBalanceConvertible, + RawDelegatorPoolBalance, RawDelegatorPoolBalanceConvertible, + RawDelegatorPoolBalanceMetadata, RawDelegatorPoolBalanceMetadataConvertible, + RawPoolBalanceMetadata, RawPoolBalanceMetadataConvertible, }, - utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address}, + schema::{current_delegated_staking_pool_balances, delegated_staking_pool_balances}, }; use ahash::AHashMap; -use aptos_protos::transaction::v1::{ - transaction::TxnData, write_set_change::Change, Transaction, WriteResource, WriteTableItem, -}; use bigdecimal::BigDecimal; use field_count::FieldCount; use serde::{Deserialize, Serialize}; - type StakingPoolAddress = String; pub type DelegatorPoolMap = AHashMap; -pub type DelegatorPoolBalanceMap = AHashMap; - -// All pools -#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] -#[diesel(primary_key(staking_pool_address))] -#[diesel(table_name = delegated_staking_pools)] -pub struct DelegatorPool { - pub staking_pool_address: String, - pub first_transaction_version: i64, -} +pub type DelegatorPoolBalanceMap = AHashMap; // Metadata to fill pool balances and delegator balance #[derive(Debug, Deserialize, Serialize)] @@ -46,6 +34,21 @@ pub struct DelegatorPoolBalanceMetadata { pub inactive_share_table_handle: String, } +impl RawDelegatorPoolBalanceMetadataConvertible for DelegatorPoolBalanceMetadata { + fn from_raw(raw: RawDelegatorPoolBalanceMetadata) -> Self { + Self { + transaction_version: raw.transaction_version, + staking_pool_address: raw.staking_pool_address, + total_coins: raw.total_coins, + total_shares: raw.total_shares, + scaling_factor: raw.scaling_factor, + operator_commission_percentage: raw.operator_commission_percentage, + active_share_table_handle: raw.active_share_table_handle, + inactive_share_table_handle: raw.inactive_share_table_handle, + } + } +} + // Similar metadata but specifically for 0x1::pool_u64_unbound::Pool #[derive(Debug, Deserialize, Serialize)] pub struct PoolBalanceMetadata { @@ -57,6 +60,19 @@ pub struct PoolBalanceMetadata { pub parent_table_handle: String, } +impl RawPoolBalanceMetadataConvertible for PoolBalanceMetadata { + fn from_raw(raw: RawPoolBalanceMetadata) -> Self { + Self { + transaction_version: raw.transaction_version, + total_coins: raw.total_coins, + total_shares: raw.total_shares, + scaling_factor: raw.scaling_factor, + shares_table_handle: raw.shares_table_handle, + parent_table_handle: raw.parent_table_handle, + } + } +} + // Pools balances #[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(transaction_version, staking_pool_address))] @@ -71,6 +87,20 @@ pub struct DelegatorPoolBalance { pub active_table_handle: String, } +impl RawDelegatorPoolBalanceConvertible for DelegatorPoolBalance { + fn from_raw(raw: RawDelegatorPoolBalance) -> Self { + Self { + transaction_version: raw.transaction_version, + staking_pool_address: raw.staking_pool_address, + total_coins: raw.total_coins, + total_shares: raw.total_shares, + operator_commission_percentage: raw.operator_commission_percentage, + inactive_table_handle: raw.inactive_table_handle, + active_table_handle: raw.active_table_handle, + } + } +} + // All pools w latest balances (really a more comprehensive version than DelegatorPool) #[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(staking_pool_address))] @@ -85,154 +115,16 @@ pub struct CurrentDelegatorPoolBalance { pub active_table_handle: String, } -impl DelegatorPool { - pub fn from_transaction( - transaction: &Transaction, - ) -> anyhow::Result<( - DelegatorPoolMap, - Vec, - DelegatorPoolBalanceMap, - )> { - let mut delegator_pool_map = AHashMap::new(); - let mut delegator_pool_balances = vec![]; - let mut delegator_pool_balances_map = AHashMap::new(); - let txn_data = match transaction.txn_data.as_ref() { - Some(data) => data, - None => { - PROCESSOR_UNKNOWN_TYPE_COUNT - .with_label_values(&["DelegatorPool"]) - .inc(); - tracing::warn!( - transaction_version = transaction.version, - "Transaction data doesn't exist", - ); - return Ok(( - delegator_pool_map, - delegator_pool_balances, - delegator_pool_balances_map, - )); - }, - }; - let txn_version = transaction.version as i64; - - // Do a first pass to get the mapping of active_share table handles to staking pool addresses - if let TxnData::User(_) = txn_data { - let changes = &transaction - .info - .as_ref() - .expect("Transaction info doesn't exist!") - .changes; - for wsc in changes { - if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { - let maybe_write_resource = - Self::from_write_resource(write_resource, txn_version)?; - if let Some((pool, pool_balances, current_pool_balances)) = maybe_write_resource - { - let staking_pool_address = pool.staking_pool_address.clone(); - delegator_pool_map.insert(staking_pool_address.clone(), pool); - delegator_pool_balances.push(pool_balances); - delegator_pool_balances_map - .insert(staking_pool_address.clone(), current_pool_balances); - } - } - } - } - Ok(( - delegator_pool_map, - delegator_pool_balances, - delegator_pool_balances_map, - )) - } - - pub fn get_delegated_pool_metadata_from_write_resource( - write_resource: &WriteResource, - txn_version: i64, - ) -> anyhow::Result> { - if let Some(StakeResource::DelegationPool(inner)) = - StakeResource::from_write_resource(write_resource, txn_version)? - { - let staking_pool_address = standardize_address(&write_resource.address.to_string()); - let total_coins = inner.active_shares.total_coins; - let total_shares = - &inner.active_shares.total_shares / &inner.active_shares.scaling_factor; - Ok(Some(DelegatorPoolBalanceMetadata { - transaction_version: txn_version, - staking_pool_address, - total_coins, - total_shares, - scaling_factor: inner.active_shares.scaling_factor, - operator_commission_percentage: inner.operator_commission_percentage.clone(), - active_share_table_handle: inner.active_shares.shares.inner.get_handle(), - inactive_share_table_handle: inner.inactive_shares.get_handle(), - })) - } else { - Ok(None) - } - } - - pub fn get_inactive_pool_metadata_from_write_table_item( - write_table_item: &WriteTableItem, - txn_version: i64, - ) -> anyhow::Result> { - let table_item_data = write_table_item.data.as_ref().unwrap(); - - if let Some(StakeTableItem::Pool(inner)) = &StakeTableItem::from_table_item_type( - table_item_data.value_type.as_str(), - &table_item_data.value, - txn_version, - )? { - let total_coins = inner.total_coins.clone(); - let total_shares = &inner.total_shares / &inner.scaling_factor; - Ok(Some(PoolBalanceMetadata { - transaction_version: txn_version, - total_coins, - total_shares, - scaling_factor: inner.scaling_factor.clone(), - shares_table_handle: inner.shares.inner.get_handle(), - parent_table_handle: standardize_address(&write_table_item.handle.to_string()), - })) - } else { - Ok(None) - } - } - - pub fn from_write_resource( - write_resource: &WriteResource, - txn_version: i64, - ) -> anyhow::Result> { - if let Some(balance) = - &Self::get_delegated_pool_metadata_from_write_resource(write_resource, txn_version)? - { - let staking_pool_address = balance.staking_pool_address.clone(); - let total_coins = balance.total_coins.clone(); - let total_shares = balance.total_shares.clone(); - let transaction_version = balance.transaction_version; - Ok(Some(( - Self { - staking_pool_address: staking_pool_address.clone(), - first_transaction_version: transaction_version, - }, - DelegatorPoolBalance { - transaction_version, - staking_pool_address: staking_pool_address.clone(), - total_coins: total_coins.clone(), - total_shares: total_shares.clone(), - operator_commission_percentage: balance.operator_commission_percentage.clone(), - inactive_table_handle: balance.inactive_share_table_handle.clone(), - active_table_handle: balance.active_share_table_handle.clone(), - }, - CurrentDelegatorPoolBalance { - staking_pool_address, - total_coins, - total_shares, - last_transaction_version: transaction_version, - operator_commission_percentage: balance.operator_commission_percentage.clone(), - inactive_table_handle: balance.inactive_share_table_handle.clone(), - active_table_handle: balance.active_share_table_handle.clone(), - }, - ))) - } else { - Ok(None) +impl RawCurrentDelegatorPoolBalanceConvertible for CurrentDelegatorPoolBalance { + fn from_raw(raw: RawCurrentDelegatorPoolBalance) -> Self { + Self { + staking_pool_address: raw.staking_pool_address, + total_coins: raw.total_coins, + total_shares: raw.total_shares, + last_transaction_version: raw.last_transaction_version, + operator_commission_percentage: raw.operator_commission_percentage, + inactive_table_handle: raw.inactive_table_handle, + active_table_handle: raw.active_table_handle, } } } diff --git a/rust/processor/src/db/postgres/models/stake_models/mod.rs b/rust/processor/src/db/postgres/models/stake_models/mod.rs index 75db7e273..a68f8ca71 100644 --- a/rust/processor/src/db/postgres/models/stake_models/mod.rs +++ b/rust/processor/src/db/postgres/models/stake_models/mod.rs @@ -1,10 +1,8 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -pub mod current_delegated_voter; pub mod delegator_activities; pub mod delegator_balances; pub mod delegator_pools; pub mod proposal_votes; -pub mod stake_utils; pub mod staking_pool_voter; diff --git a/rust/processor/src/db/postgres/models/stake_models/proposal_votes.rs b/rust/processor/src/db/postgres/models/stake_models/proposal_votes.rs index 0540afb5e..d171f9977 100644 --- a/rust/processor/src/db/postgres/models/stake_models/proposal_votes.rs +++ b/rust/processor/src/db/postgres/models/stake_models/proposal_votes.rs @@ -4,15 +4,12 @@ // This is required because a diesel macro makes clippy sad #![allow(clippy::extra_unused_lifetimes)] -use super::stake_utils::StakeEvent; use crate::{ - schema::proposal_votes, - utils::{ - counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - util::{parse_timestamp, standardize_address}, + db::common::models::stake_models::proposal_voters::{ + RawProposalVote, RawProposalVoteConvertible, }, + schema::proposal_votes, }; -use aptos_protos::transaction::v1::{transaction::TxnData, Transaction}; use bigdecimal::BigDecimal; use field_count::FieldCount; use serde::{Deserialize, Serialize}; @@ -30,44 +27,16 @@ pub struct ProposalVote { pub transaction_timestamp: chrono::NaiveDateTime, } -impl ProposalVote { - pub fn from_transaction(transaction: &Transaction) -> anyhow::Result> { - let mut proposal_votes = vec![]; - let txn_data = match transaction.txn_data.as_ref() { - Some(data) => data, - None => { - PROCESSOR_UNKNOWN_TYPE_COUNT - .with_label_values(&["ProposalVote"]) - .inc(); - tracing::warn!( - transaction_version = transaction.version, - "Transaction data doesn't exist", - ); - return Ok(proposal_votes); - }, - }; - let txn_version = transaction.version as i64; - - if let TxnData::User(user_txn) = txn_data { - for event in user_txn.events.iter() { - if let Some(StakeEvent::GovernanceVoteEvent(ev)) = - StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)? - { - proposal_votes.push(Self { - transaction_version: txn_version, - proposal_id: ev.proposal_id as i64, - voter_address: standardize_address(&ev.voter), - staking_pool_address: standardize_address(&ev.stake_pool), - num_votes: ev.num_votes.clone(), - should_pass: ev.should_pass, - transaction_timestamp: parse_timestamp( - transaction.timestamp.as_ref().unwrap(), - txn_version, - ), - }); - } - } +impl RawProposalVoteConvertible for ProposalVote { + fn from_raw(raw: RawProposalVote) -> Self { + Self { + transaction_version: raw.transaction_version, + proposal_id: raw.proposal_id, + voter_address: raw.voter_address, + staking_pool_address: raw.staking_pool_address, + num_votes: raw.num_votes, + should_pass: raw.should_pass, + transaction_timestamp: raw.transaction_timestamp, } - Ok(proposal_votes) } } diff --git a/rust/processor/src/db/postgres/models/stake_models/staking_pool_voter.rs b/rust/processor/src/db/postgres/models/stake_models/staking_pool_voter.rs index 67ecf4d45..93a72aca5 100644 --- a/rust/processor/src/db/postgres/models/stake_models/staking_pool_voter.rs +++ b/rust/processor/src/db/postgres/models/stake_models/staking_pool_voter.rs @@ -4,10 +4,13 @@ // This is required because a diesel macro makes clippy sad #![allow(clippy::extra_unused_lifetimes)] -use super::stake_utils::StakeResource; -use crate::{schema::current_staking_pool_voter, utils::util::standardize_address}; +use crate::{ + db::common::models::stake_models::staking_pool_voter::{ + RawCurrentStakingPoolVoter, RawCurrentStakingPoolVoterConvertible, + }, + schema::current_staking_pool_voter, +}; use ahash::AHashMap; -use aptos_protos::transaction::v1::{write_set_change::Change, Transaction}; use field_count::FieldCount; use serde::{Deserialize, Serialize}; @@ -24,28 +27,13 @@ pub struct CurrentStakingPoolVoter { pub operator_address: String, } -impl CurrentStakingPoolVoter { - pub fn from_transaction(transaction: &Transaction) -> anyhow::Result { - let mut staking_pool_voters = AHashMap::new(); - - let txn_version = transaction.version as i64; - for wsc in &transaction.info.as_ref().unwrap().changes { - if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { - if let Some(StakeResource::StakePool(inner)) = - StakeResource::from_write_resource(write_resource, txn_version)? - { - let staking_pool_address = - standardize_address(&write_resource.address.to_string()); - staking_pool_voters.insert(staking_pool_address.clone(), Self { - staking_pool_address, - voter_address: inner.get_delegated_voter(), - last_transaction_version: txn_version, - operator_address: inner.get_operator_address(), - }); - } - } +impl RawCurrentStakingPoolVoterConvertible for CurrentStakingPoolVoter { + fn from_raw(raw: RawCurrentStakingPoolVoter) -> Self { + Self { + staking_pool_address: raw.staking_pool_address, + voter_address: raw.voter_address, + last_transaction_version: raw.last_transaction_version, + operator_address: raw.operator_address, } - - Ok(staking_pool_voters) } } diff --git a/rust/processor/src/processors/stake_processor.rs b/rust/processor/src/processors/stake_processor.rs index 128be89ed..b5b683d3c 100644 --- a/rust/processor/src/processors/stake_processor.rs +++ b/rust/processor/src/processors/stake_processor.rs @@ -3,18 +3,35 @@ use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ - db::postgres::models::stake_models::{ - current_delegated_voter::CurrentDelegatedVoter, - delegator_activities::DelegatedStakingActivity, - delegator_balances::{ - CurrentDelegatorBalance, CurrentDelegatorBalanceMap, DelegatorBalance, + db::{ + common::models::stake_models::{ + current_delegated_voter::CurrentDelegatedVoter, + delegator_activities::{ + RawDelegatedStakingActivity, RawDelegatedStakingActivityConvertible, + }, + delegator_balances::{ + RawCurrentDelegatorBalance, RawCurrentDelegatorBalanceConvertible, + RawCurrentDelegatorBalanceMap, RawDelegatorBalance, RawDelegatorBalanceConvertible, + }, + delegator_pools::{ + DelegatorPool, DelegatorPoolMap, RawCurrentDelegatorPoolBalance, + RawCurrentDelegatorPoolBalanceConvertible, RawDelegatorPoolBalance, + RawDelegatorPoolBalanceConvertible, + }, + proposal_voters::{RawProposalVote, RawProposalVoteConvertible}, + stake_utils::DelegationVoteGovernanceRecordsResource, + staking_pool_voter::{ + RawCurrentStakingPoolVoter, RawCurrentStakingPoolVoterConvertible, + StakingPoolRawVoterMap, + }, }, - delegator_pools::{ - CurrentDelegatorPoolBalance, DelegatorPool, DelegatorPoolBalance, DelegatorPoolMap, + postgres::models::stake_models::{ + delegator_activities::DelegatedStakingActivity, + delegator_balances::{CurrentDelegatorBalance, DelegatorBalance}, + delegator_pools::{CurrentDelegatorPoolBalance, DelegatorPoolBalance}, + proposal_votes::ProposalVote, + staking_pool_voter::CurrentStakingPoolVoter, }, - proposal_votes::ProposalVote, - stake_utils::DelegationVoteGovernanceRecordsResource, - staking_pool_voter::{CurrentStakingPoolVoter, StakingPoolVoterMap}, }, gap_detectors::ProcessingResult, schema, @@ -384,28 +401,28 @@ pub fn insert_current_delegated_voter_query( pub async fn parse_stake_data( transactions: &Vec, - mut conn: DbPoolConnection<'_>, + mut conn: Option>, query_retries: u32, query_retry_delay_ms: u64, ) -> Result< ( - Vec, - Vec, - Vec, - Vec, - Vec, + Vec, + Vec, + Vec, + Vec, + Vec, Vec, - Vec, - Vec, + Vec, + Vec, Vec, ), anyhow::Error, > { - let mut all_current_stake_pool_voters: StakingPoolVoterMap = AHashMap::new(); + let mut all_current_stake_pool_voters: StakingPoolRawVoterMap = AHashMap::new(); let mut all_proposal_votes = vec![]; let mut all_delegator_activities = vec![]; let mut all_delegator_balances = vec![]; - let mut all_current_delegator_balances: CurrentDelegatorBalanceMap = AHashMap::new(); + let mut all_current_delegator_balances: RawCurrentDelegatorBalanceMap = AHashMap::new(); let mut all_delegator_pools: DelegatorPoolMap = AHashMap::new(); let mut all_delegator_pool_balances = vec![]; let mut all_current_delegator_pool_balances = AHashMap::new(); @@ -417,13 +434,13 @@ pub async fn parse_stake_data( for txn in transactions { // Add votes data - let current_stake_pool_voter = CurrentStakingPoolVoter::from_transaction(txn).unwrap(); + let current_stake_pool_voter = RawCurrentStakingPoolVoter::from_transaction(txn).unwrap(); all_current_stake_pool_voters.extend(current_stake_pool_voter); - let mut proposal_votes = ProposalVote::from_transaction(txn).unwrap(); + let mut proposal_votes = RawProposalVote::from_transaction(txn).unwrap(); all_proposal_votes.append(&mut proposal_votes); // Add delegator activities - let mut delegator_activities = DelegatedStakingActivity::from_transaction(txn).unwrap(); + let mut delegator_activities = RawDelegatedStakingActivity::from_transaction(txn).unwrap(); all_delegator_activities.append(&mut delegator_activities); // Add delegator pools @@ -455,67 +472,71 @@ pub async fn parse_stake_data( all_vote_delegation_handle_to_pool_address .insert(vote_delegation_handle, delegation_pool_address.clone()); } - if let Some(map) = CurrentDelegatorBalance::get_active_pool_to_staking_pool_mapping( - write_resource, - txn_version, - ) - .unwrap() + if let Some(map) = + RawCurrentDelegatorBalance::get_active_pool_to_staking_pool_mapping( + write_resource, + txn_version, + ) + .unwrap() { active_pool_to_staking_pool.extend(map); } } } - // Add delegator balances - let (mut delegator_balances, current_delegator_balances) = - CurrentDelegatorBalance::from_transaction( - txn, - &active_pool_to_staking_pool, - &mut conn, - query_retries, - query_retry_delay_ms, - ) - .await - .unwrap(); - all_delegator_balances.append(&mut delegator_balances); - all_current_delegator_balances.extend(current_delegator_balances); - - // this write table item indexing is to get delegator address, table handle, and voter & pending voter - for wsc in &transaction_info.changes { - if let Change::WriteTableItem(write_table_item) = wsc.change.as_ref().unwrap() { - let voter_map = CurrentDelegatedVoter::from_write_table_item( - write_table_item, - txn_version, - txn_timestamp, - &all_vote_delegation_handle_to_pool_address, - &mut conn, + if let Some(ref mut conn) = conn { + // Add delegator balances + let (mut delegator_balances, current_delegator_balances) = + RawCurrentDelegatorBalance::from_transaction( + txn, + &active_pool_to_staking_pool, + conn, query_retries, query_retry_delay_ms, ) .await .unwrap(); - - all_current_delegated_voter.extend(voter_map); + all_delegator_balances.append(&mut delegator_balances); + all_current_delegator_balances.extend(current_delegator_balances); + + // this write table item indexing is to get delegator address, table handle, and voter & pending voter + for wsc in &transaction_info.changes { + if let Change::WriteTableItem(write_table_item) = wsc.change.as_ref().unwrap() { + let voter_map = CurrentDelegatedVoter::from_write_table_item( + write_table_item, + txn_version, + txn_timestamp, + &all_vote_delegation_handle_to_pool_address, + conn, + query_retries, + query_retry_delay_ms, + ) + .await + .unwrap(); + + all_current_delegated_voter.extend(voter_map); + } } - } - // we need one last loop to prefill delegators that got in before the delegated voting contract was deployed - for wsc in &transaction_info.changes { - if let Change::WriteTableItem(write_table_item) = wsc.change.as_ref().unwrap() { - if let Some(voter) = CurrentDelegatedVoter::get_delegators_pre_contract_deployment( - write_table_item, - txn_version, - txn_timestamp, - &active_pool_to_staking_pool, - &all_current_delegated_voter, - &mut conn, - query_retries, - query_retry_delay_ms, - ) - .await - .unwrap() - { - all_current_delegated_voter.insert(voter.pk(), voter); + // we need one last loop to prefill delegators that got in before the delegated voting contract was deployed + for wsc in &transaction_info.changes { + if let Change::WriteTableItem(write_table_item) = wsc.change.as_ref().unwrap() { + if let Some(voter) = + CurrentDelegatedVoter::get_delegators_pre_contract_deployment( + write_table_item, + txn_version, + txn_timestamp, + &active_pool_to_staking_pool, + &all_current_delegated_voter, + conn, + query_retries, + query_retry_delay_ms, + ) + .await + .unwrap() + { + all_current_delegated_voter.insert(voter.pk(), voter); + } } } } @@ -524,16 +545,16 @@ pub async fn parse_stake_data( // Getting list of values and sorting by pk in order to avoid postgres deadlock since we're doing multi threaded db writes let mut all_current_stake_pool_voters = all_current_stake_pool_voters .into_values() - .collect::>(); + .collect::>(); let mut all_current_delegator_balances = all_current_delegator_balances .into_values() - .collect::>(); + .collect::>(); let mut all_delegator_pools = all_delegator_pools .into_values() .collect::>(); let mut all_current_delegator_pool_balances = all_current_delegator_pool_balances .into_values() - .collect::>(); + .collect::>(); let mut all_current_delegated_voter = all_current_delegated_voter .into_values() .collect::>(); @@ -588,16 +609,23 @@ impl ProcessorTrait for StakeProcessor { let query_retry_delay_ms = self.config.query_retry_delay_ms; let ( - all_current_stake_pool_voters, - all_proposal_votes, - all_delegator_activities, - all_delegator_balances, - all_current_delegator_balances, + raw_all_current_stake_pool_voters, + raw_all_proposal_votes, + raw_all_delegator_activities, + raw_all_delegator_balances, + raw_all_current_delegator_balances, all_delegator_pools, - all_delegator_pool_balances, - all_current_delegator_pool_balances, + raw_all_delegator_pool_balances, + raw_all_current_delegator_pool_balances, all_current_delegated_voter, - ) = match parse_stake_data(&transactions, conn, query_retries, query_retry_delay_ms).await { + ) = match parse_stake_data( + &transactions, + Some(conn), + query_retries, + query_retry_delay_ms, + ) + .await + { Ok(data) => data, Err(e) => { error!( @@ -610,6 +638,34 @@ impl ProcessorTrait for StakeProcessor { bail!(e) }, }; + let all_delegator_balances: Vec = raw_all_delegator_balances + .into_iter() + .map(DelegatorBalance::from_raw) + .collect::>(); + let all_current_delegator_balances = raw_all_current_delegator_balances + .into_iter() + .map(CurrentDelegatorBalance::from_raw) + .collect::>(); + let all_delegator_pool_balances = raw_all_delegator_pool_balances + .into_iter() + .map(DelegatorPoolBalance::from_raw) + .collect::>(); + let all_current_delegator_pool_balances = raw_all_current_delegator_pool_balances + .into_iter() + .map(CurrentDelegatorPoolBalance::from_raw) + .collect::>(); + let all_delegator_activities = raw_all_delegator_activities + .into_iter() + .map(DelegatedStakingActivity::from_raw) + .collect::>(); + let all_proposal_votes = raw_all_proposal_votes + .into_iter() + .map(ProposalVote::from_raw) + .collect::>(); + let all_current_stake_pool_voters = raw_all_current_stake_pool_voters + .into_iter() + .map(CurrentStakingPoolVoter::from_raw) + .collect::>(); let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); diff --git a/rust/processor/src/utils/util.rs b/rust/processor/src/utils/util.rs index 23d8496e7..26325c834 100644 --- a/rust/processor/src/utils/util.rs +++ b/rust/processor/src/utils/util.rs @@ -1,6 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use super::database::DbPoolConnection; use crate::{ db::postgres::models::property_map::{PropertyMap, TokenObjectPropertyMap}, utils::counters::PROCESSOR_UNKNOWN_TYPE_COUNT, @@ -39,6 +40,13 @@ lazy_static! { pub static ref APT_METADATA_ADDRESS_HEX: String = format!("0x{}", hex::encode(*APT_METADATA_ADDRESS_RAW)); } + +pub struct DbConnectionConfig<'a> { + pub conn: DbPoolConnection<'a>, + pub query_retries: u32, + pub query_retry_delay_ms: u64, +} + // Supporting structs to get clean payload without escaped strings #[derive(Debug, Deserialize, Serialize)] pub struct EntryFunctionPayloadClean { diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index fc8a20916..94009c03f 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -10,6 +10,7 @@ use crate::{ parquet_events_processor::ParquetEventsProcessor, parquet_fungible_asset_processor::ParquetFungibleAssetProcessor, parquet_objects_processor::ParquetObjectsProcessor, + parquet_stake_processor::ParquetStakeProcessor, parquet_token_v2_processor::ParquetTokenV2Processor, parquet_transaction_metadata_processor::ParquetTransactionMetadataProcessor, parquet_user_transaction_processor::ParquetUserTransactionsProcessor, @@ -123,6 +124,10 @@ impl RunnableConfig for IndexerProcessorConfig { let parquet_ans_processor = ParquetAnsProcessor::new(self.clone()).await?; parquet_ans_processor.run_processor().await }, + ProcessorConfig::ParquetStakeProcessor(_) => { + let parquet_stake_processor = ParquetStakeProcessor::new(self.clone()).await?; + parquet_stake_processor.run_processor().await + }, ProcessorConfig::ParquetObjectsProcessor(_) => { let parquet_objects_processor = ParquetObjectsProcessor::new(self.clone()).await?; parquet_objects_processor.run_processor().await diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index ffbabfcc1..9e554f141 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -33,6 +33,11 @@ use processor::{ parquet_v2_fungible_metadata::FungibleAssetMetadataModel, }, object_models::v2_objects::{CurrentObject, Object}, + stake_models::{ + parquet_delegator_activities::DelegatedStakingActivity, + parquet_delegator_balances::{CurrentDelegatorBalance, DelegatorBalance}, + parquet_proposal_voters::ProposalVote, + }, token_v2_models::{ token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, @@ -101,6 +106,7 @@ pub enum ProcessorConfig { ParquetTransactionMetadataProcessor(ParquetDefaultProcessorConfig), ParquetAccountTransactionsProcessor(ParquetDefaultProcessorConfig), ParquetTokenV2Processor(ParquetDefaultProcessorConfig), + ParquetStakeProcessor(ParquetDefaultProcessorConfig), ParquetObjectsProcessor(ParquetDefaultProcessorConfig), } @@ -123,6 +129,7 @@ impl ProcessorConfig { | ProcessorConfig::ParquetTransactionMetadataProcessor(config) | ProcessorConfig::ParquetAccountTransactionsProcessor(config) | ProcessorConfig::ParquetTokenV2Processor(config) + | ProcessorConfig::ParquetStakeProcessor(config) | ProcessorConfig::ParquetObjectsProcessor(config) | ProcessorConfig::ParquetFungibleAssetProcessor(config) => config, ProcessorConfig::ParquetAnsProcessor(config) => &config.default, @@ -205,6 +212,12 @@ impl ProcessorConfig { Object::TABLE_NAME.to_string(), CurrentObject::TABLE_NAME.to_string(), ]), + ProcessorName::ParquetStakeProcessor => HashSet::from([ + DelegatedStakingActivity::TABLE_NAME.to_string(), + ProposalVote::TABLE_NAME.to_string(), + DelegatorBalance::TABLE_NAME.to_string(), + CurrentDelegatorBalance::TABLE_NAME.to_string(), + ]), _ => HashSet::new(), // Default case for unsupported processors } } diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index b3028c09a..d44c875b3 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -37,6 +37,11 @@ use processor::{ parquet_v2_fungible_metadata::FungibleAssetMetadataModel, }, object_models::v2_objects::{CurrentObject, Object}, + stake_models::{ + parquet_delegator_activities::DelegatedStakingActivity, + parquet_delegator_balances::{CurrentDelegatorBalance, DelegatorBalance}, + parquet_proposal_voters::ProposalVote, + }, token_v2_models::{ token_claims::CurrentTokenPendingClaim, v1_token_royalty::CurrentTokenRoyaltyV1, @@ -65,6 +70,7 @@ pub mod parquet_default_processor; pub mod parquet_events_processor; pub mod parquet_fungible_asset_processor; pub mod parquet_objects_processor; +pub mod parquet_stake_processor; pub mod parquet_token_v2_processor; pub mod parquet_transaction_metadata_processor; pub mod parquet_user_transaction_processor; @@ -131,6 +137,11 @@ pub enum ParquetTypeEnum { CurrentTokenDatasV2, TokenOwnershipsV2, CurrentTokenOwnershipsV2, + // stake + DelegatedStakingActivities, + CurrentDelegatorBalances, + DelegatorBalances, + ProposalVotes, // Objects Objects, CurrentObjects, @@ -238,6 +249,16 @@ impl_parquet_trait!( CurrentTokenOwnershipV2, ParquetTypeEnum::CurrentTokenOwnershipsV2 ); +impl_parquet_trait!( + DelegatedStakingActivity, + ParquetTypeEnum::DelegatedStakingActivities +); +impl_parquet_trait!( + CurrentDelegatorBalance, + ParquetTypeEnum::CurrentDelegatorBalances +); +impl_parquet_trait!(DelegatorBalance, ParquetTypeEnum::DelegatorBalances); +impl_parquet_trait!(ProposalVote, ParquetTypeEnum::ProposalVotes); impl_parquet_trait!(Object, ParquetTypeEnum::Objects); impl_parquet_trait!(CurrentObject, ParquetTypeEnum::CurrentObjects); #[derive(Debug, Clone)] @@ -280,6 +301,12 @@ pub enum ParquetTypeStructs { CurrentTokenDataV2(Vec), TokenOwnershipV2(Vec), CurrentTokenOwnershipV2(Vec), + // Stake + DelegatedStakingActivity(Vec), + CurrentDelegatorBalance(Vec), + DelegatorBalance(Vec), + ProposalVote(Vec), + // Objects Object(Vec), CurrentObject(Vec), } @@ -344,6 +371,14 @@ impl ParquetTypeStructs { ParquetTypeEnum::CurrentTokenOwnershipsV2 => { ParquetTypeStructs::CurrentTokenOwnershipV2(Vec::new()) }, + ParquetTypeEnum::DelegatedStakingActivities => { + ParquetTypeStructs::DelegatedStakingActivity(Vec::new()) + }, + ParquetTypeEnum::CurrentDelegatorBalances => { + ParquetTypeStructs::CurrentDelegatorBalance(Vec::new()) + }, + ParquetTypeEnum::DelegatorBalances => ParquetTypeStructs::DelegatorBalance(Vec::new()), + ParquetTypeEnum::ProposalVotes => ParquetTypeStructs::ProposalVote(Vec::new()), ParquetTypeEnum::Objects => ParquetTypeStructs::Object(Vec::new()), ParquetTypeEnum::CurrentObjects => ParquetTypeStructs::CurrentObject(Vec::new()), } @@ -531,6 +566,30 @@ impl ParquetTypeStructs { ) => { handle_append!(self_data, other_data) }, + ( + ParquetTypeStructs::DelegatedStakingActivity(self_data), + ParquetTypeStructs::DelegatedStakingActivity(other_data), + ) => { + handle_append!(self_data, other_data) + }, + ( + ParquetTypeStructs::CurrentDelegatorBalance(self_data), + ParquetTypeStructs::CurrentDelegatorBalance(other_data), + ) => { + handle_append!(self_data, other_data) + }, + ( + ParquetTypeStructs::DelegatorBalance(self_data), + ParquetTypeStructs::DelegatorBalance(other_data), + ) => { + handle_append!(self_data, other_data) + }, + ( + ParquetTypeStructs::ProposalVote(self_data), + ParquetTypeStructs::ProposalVote(other_data), + ) => { + handle_append!(self_data, other_data) + }, (ParquetTypeStructs::Object(self_data), ParquetTypeStructs::Object(other_data)) => { handle_append!(self_data, other_data) }, diff --git a/rust/sdk-processor/src/parquet_processors/parquet_stake_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_stake_processor.rs new file mode 100644 index 000000000..10b3dc61c --- /dev/null +++ b/rust/sdk-processor/src/parquet_processors/parquet_stake_processor.rs @@ -0,0 +1,192 @@ +use crate::{ + config::{ + db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + processor_config::ProcessorConfig, + }, + parquet_processors::{ + initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step, + set_backfill_table_flag, ParquetTypeEnum, + }, + steps::{ + common::{ + parquet_version_tracker_step::ParquetVersionTrackerStep, + processor_status_saver::get_processor_status_saver, + }, + parquet_stake_processor::parquet_stake_extractor::ParquetStakeExtractor, + }, + utils::{ + chain_id::check_or_update_chain_id, + database::{run_migrations, ArcDbPool}, + starting_version::get_min_last_success_version_parquet, + }, +}; +use anyhow::Context; +use aptos_indexer_processor_sdk::{ + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::{TransactionStreamStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS}, + traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, +}; +use parquet::schema::types::Type; +use processor::{ + bq_analytics::generic_parquet_processor::HasParquetSchema, + db::parquet::models::stake_models::{ + parquet_delegator_activities::DelegatedStakingActivity, + parquet_delegator_balances::{CurrentDelegatorBalance, DelegatorBalance}, + parquet_proposal_voters::ProposalVote, + }, +}; +use std::{collections::HashMap, sync::Arc}; +use tracing::{debug, info}; + +pub struct ParquetStakeProcessor { + pub config: IndexerProcessorConfig, + pub db_pool: ArcDbPool, +} + +impl ParquetStakeProcessor { + pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result { + let db_pool = initialize_database_pool(&config.db_config).await?; + Ok(Self { config, db_pool }) + } +} + +#[async_trait::async_trait] +impl ProcessorTrait for ParquetStakeProcessor { + fn name(&self) -> &'static str { + self.config.processor_config.name() + } + + async fn run_processor(&self) -> anyhow::Result<()> { + // Run Migrations + let parquet_db_config = match self.config.db_config { + DbConfig::ParquetConfig(ref parquet_config) => { + run_migrations( + parquet_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; + parquet_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid db config for ParquetStakeProcessor {:?}", + self.config.db_config + )); + }, + }; + + // Check and update the ledger chain id to ensure we're indexing the correct chain + let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) + .await? + .get_chain_id() + .await?; + check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; + + let parquet_processor_config = match self.config.processor_config.clone() { + ProcessorConfig::ParquetStakeProcessor(parquet_processor_config) => { + parquet_processor_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid processor configuration for ParquetStakeProcessor {:?}", + self.config.processor_config + )); + }, + }; + + let processor_status_table_names = self + .config + .processor_config + .get_processor_status_table_names() + .context("Failed to get table names for the processor status table")?; + + let starting_version = get_min_last_success_version_parquet( + &self.config, + self.db_pool.clone(), + processor_status_table_names, + ) + .await?; + + // Define processor transaction stream config + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + + let backfill_table = set_backfill_table_flag(parquet_processor_config.backfill_table); + let parquet_stake_extractor = ParquetStakeExtractor { + opt_in_tables: backfill_table, + }; + + let gcs_client = + initialize_gcs_client(parquet_db_config.google_application_credentials.clone()).await; + + let parquet_type_to_schemas: HashMap> = [ + ( + ParquetTypeEnum::DelegatedStakingActivities, + DelegatedStakingActivity::schema(), + ), + (ParquetTypeEnum::ProposalVotes, ProposalVote::schema()), + ( + ParquetTypeEnum::DelegatorBalances, + DelegatorBalance::schema(), + ), + ( + ParquetTypeEnum::CurrentDelegatorBalances, + CurrentDelegatorBalance::schema(), + ), + ] + .into_iter() + .collect(); + + let default_size_buffer_step = initialize_parquet_buffer_step( + gcs_client.clone(), + parquet_type_to_schemas, + parquet_processor_config.upload_interval, + parquet_processor_config.max_buffer_size, + parquet_db_config.bucket_name.clone(), + parquet_db_config.bucket_root.clone(), + self.name().to_string(), + ) + .await + .unwrap_or_else(|e| { + panic!("Failed to initialize parquet buffer step: {:?}", e); + }); + + let parquet_version_tracker_step = ParquetVersionTrackerStep::new( + get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + ); + + let channel_size = parquet_processor_config.channel_size; + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to(parquet_stake_extractor.into_runnable_step(), channel_size) + .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .connect_to( + parquet_version_tracker_step.into_runnable_step(), + channel_size, + ) + .end_and_return_output_receiver(channel_size); + + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index de856c9f5..340a42644 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -16,8 +16,8 @@ pub mod parquet_default_processor; pub mod parquet_events_processor; pub mod parquet_fungible_asset_processor; pub mod parquet_objects_processor; +pub mod parquet_stake_processor; pub mod parquet_token_v2_processor; pub mod parquet_transaction_metadata_processor; pub mod parquet_user_transaction_processor; - pub const MIN_TRANSACTIONS_PER_RAYON_JOB: usize = 64; diff --git a/rust/sdk-processor/src/steps/parquet_stake_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_stake_processor/mod.rs new file mode 100644 index 000000000..cb40663e9 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_stake_processor/mod.rs @@ -0,0 +1 @@ +pub mod parquet_stake_extractor; diff --git a/rust/sdk-processor/src/steps/parquet_stake_processor/parquet_stake_extractor.rs b/rust/sdk-processor/src/steps/parquet_stake_processor/parquet_stake_extractor.rs new file mode 100644 index 000000000..fe0494f31 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_stake_processor/parquet_stake_extractor.rs @@ -0,0 +1,150 @@ +use crate::{ + parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, + utils::parquet_extractor_helper::add_to_map_if_opted_in_for_backfill, +}; +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::{ + db::{ + common::models::stake_models::{ + delegator_activities::RawDelegatedStakingActivityConvertible, + delegator_balances::{ + RawCurrentDelegatorBalanceConvertible, RawDelegatorBalanceConvertible, + }, + proposal_voters::RawProposalVoteConvertible, + }, + parquet::models::stake_models::{ + parquet_delegator_activities::DelegatedStakingActivity, + parquet_delegator_balances::{CurrentDelegatorBalance, DelegatorBalance}, + parquet_proposal_voters::ProposalVote, + }, + }, + processors::stake_processor::parse_stake_data, + utils::table_flags::TableFlags, +}; +use std::collections::HashMap; +use tracing::{debug, error}; + +/// Extracts parquet data from transactions, allowing optional selection of specific tables. +pub struct ParquetStakeExtractor +where + Self: Processable + Send + Sized + 'static, +{ + pub opt_in_tables: TableFlags, +} + +type ParquetTypeMap = HashMap; + +#[async_trait] +impl Processable for ParquetStakeExtractor { + type Input = Vec; + type Output = ParquetTypeMap; + type RunType = AsyncRunType; + + async fn process( + &mut self, + transactions: TransactionContext, + ) -> anyhow::Result>, ProcessorError> { + let ( + _, + raw_all_proposal_votes, + raw_all_delegator_activities, + raw_all_delegator_balances, + raw_all_current_delegator_balances, + _, + _, + _, + _, + ) = match parse_stake_data(&transactions.data, None, 0, 0).await { + Ok(data) => data, + Err(e) => { + error!( + start_version = transactions.metadata.start_version, + end_version = transactions.metadata.end_version, + processor_name = self.name(), + error = ?e, + "[Parser] Error parsing stake data", + ); + return Err(ProcessorError::ProcessError { + message: format!("Error parsing stake data: {:?}", e), + }); + }, + }; + + let all_delegator_activities = raw_all_delegator_activities + .into_iter() + .map(DelegatedStakingActivity::from_raw) + .collect::>(); + let all_delegator_balances: Vec = raw_all_delegator_balances + .into_iter() + .map(DelegatorBalance::from_raw) + .collect::>(); + let all_current_delegator_balances = raw_all_current_delegator_balances + .into_iter() + .map(CurrentDelegatorBalance::from_raw) + .collect::>(); + let all_proposal_votes = raw_all_proposal_votes + .into_iter() + .map(ProposalVote::from_raw) + .collect::>(); + + // Print the size of each extracted data type + debug!("Processed data sizes:"); + debug!( + " - DelegatedStakingActivity: {}", + all_delegator_activities.len() + ); + debug!(" - ProposalVote: {}", all_proposal_votes.len()); + debug!(" - DelegatorBalance: {}", all_delegator_balances.len()); + debug!( + " - CurrentDelegatorBalance: {}", + all_current_delegator_balances.len() + ); + + let mut map: HashMap = HashMap::new(); + + let data_types = [ + ( + TableFlags::DELEGATED_STAKING_ACTIVITIES, + ParquetTypeEnum::DelegatedStakingActivities, + ParquetTypeStructs::DelegatedStakingActivity(all_delegator_activities), + ), + ( + TableFlags::PROPOSAL_VOTES, + ParquetTypeEnum::ProposalVotes, + ParquetTypeStructs::ProposalVote(all_proposal_votes), + ), + ( + TableFlags::DELEGATOR_BALANCES, + ParquetTypeEnum::DelegatorBalances, + ParquetTypeStructs::DelegatorBalance(all_delegator_balances), + ), + ( + TableFlags::CURRENT_DELEGATOR_BALANCES, + ParquetTypeEnum::CurrentDelegatorBalances, + ParquetTypeStructs::CurrentDelegatorBalance(all_current_delegator_balances), + ), + ]; + + // Populate the map based on opt-in tables + add_to_map_if_opted_in_for_backfill(self.opt_in_tables, &mut map, data_types.to_vec()); + + Ok(Some(TransactionContext { + data: map, + metadata: transactions.metadata, + })) + } +} + +impl AsyncStep for ParquetStakeExtractor {} + +impl NamedStep for ParquetStakeExtractor { + fn name(&self) -> String { + "ParquetStakeExtractor".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/stake_processor/stake_extractor.rs b/rust/sdk-processor/src/steps/stake_processor/stake_extractor.rs index 5228136a2..0e368e945 100644 --- a/rust/sdk-processor/src/steps/stake_processor/stake_extractor.rs +++ b/rust/sdk-processor/src/steps/stake_processor/stake_extractor.rs @@ -7,13 +7,27 @@ use aptos_indexer_processor_sdk::{ }; use async_trait::async_trait; use processor::{ - db::postgres::models::stake_models::{ - current_delegated_voter::CurrentDelegatedVoter, - delegator_activities::DelegatedStakingActivity, - delegator_balances::{CurrentDelegatorBalance, DelegatorBalance}, - delegator_pools::{CurrentDelegatorPoolBalance, DelegatorPool, DelegatorPoolBalance}, - proposal_votes::ProposalVote, - staking_pool_voter::CurrentStakingPoolVoter, + db::{ + common::models::stake_models::{ + current_delegated_voter::CurrentDelegatedVoter, + delegator_activities::RawDelegatedStakingActivityConvertible, + delegator_balances::{ + RawCurrentDelegatorBalanceConvertible, RawDelegatorBalanceConvertible, + }, + delegator_pools::{ + DelegatorPool, RawCurrentDelegatorPoolBalanceConvertible, + RawDelegatorPoolBalanceConvertible, + }, + proposal_voters::RawProposalVoteConvertible, + staking_pool_voter::RawCurrentStakingPoolVoterConvertible, + }, + postgres::models::stake_models::{ + delegator_activities::DelegatedStakingActivity, + delegator_balances::{CurrentDelegatorBalance, DelegatorBalance}, + delegator_pools::{CurrentDelegatorPoolBalance, DelegatorPoolBalance}, + proposal_votes::ProposalVote, + staking_pool_voter::CurrentStakingPoolVoter, + }, }, processors::stake_processor::parse_stake_data, }; @@ -89,18 +103,18 @@ impl Processable for StakeExtractor { })?; let ( - all_current_stake_pool_voters, - all_proposal_votes, - all_delegator_activities, - all_delegator_balances, - all_current_delegator_balances, + raw_all_current_stake_pool_voters, + raw_all_proposal_votes, + raw_all_delegator_activities, + raw_all_delegator_balances, + raw_all_current_delegator_balances, all_delegator_pools, - all_delegator_pool_balances, - all_current_delegator_pool_balances, + raw_all_delegator_pool_balances, + raw_all_current_delegator_pool_balances, all_current_delegated_voter, ) = match parse_stake_data( &transactions.data, - conn, + Some(conn), self.query_retries, self.query_retry_delay_ms, ) @@ -121,6 +135,35 @@ impl Processable for StakeExtractor { }, }; + let all_delegator_balances: Vec = raw_all_delegator_balances + .into_iter() + .map(DelegatorBalance::from_raw) + .collect::>(); + let all_current_delegator_balances = raw_all_current_delegator_balances + .into_iter() + .map(CurrentDelegatorBalance::from_raw) + .collect::>(); + let all_delegator_pool_balances = raw_all_delegator_pool_balances + .into_iter() + .map(DelegatorPoolBalance::from_raw) + .collect::>(); + let all_current_delegator_pool_balances = raw_all_current_delegator_pool_balances + .into_iter() + .map(CurrentDelegatorPoolBalance::from_raw) + .collect::>(); + let all_delegator_activities = raw_all_delegator_activities + .into_iter() + .map(DelegatedStakingActivity::from_raw) + .collect::>(); + let all_proposal_votes = raw_all_proposal_votes + .into_iter() + .map(ProposalVote::from_raw) + .collect::>(); + let all_current_stake_pool_voters = raw_all_current_stake_pool_voters + .into_iter() + .map(CurrentStakingPoolVoter::from_raw) + .collect::>(); + Ok(Some(TransactionContext { data: ( all_current_stake_pool_voters, diff --git a/rust/sdk-processor/src/steps/stake_processor/stake_storer.rs b/rust/sdk-processor/src/steps/stake_processor/stake_storer.rs index 736c0c648..4fface33e 100644 --- a/rust/sdk-processor/src/steps/stake_processor/stake_storer.rs +++ b/rust/sdk-processor/src/steps/stake_processor/stake_storer.rs @@ -11,13 +11,17 @@ use aptos_indexer_processor_sdk::{ }; use async_trait::async_trait; use processor::{ - db::postgres::models::stake_models::{ - current_delegated_voter::CurrentDelegatedVoter, - delegator_activities::DelegatedStakingActivity, - delegator_balances::{CurrentDelegatorBalance, DelegatorBalance}, - delegator_pools::{CurrentDelegatorPoolBalance, DelegatorPool, DelegatorPoolBalance}, - proposal_votes::ProposalVote, - staking_pool_voter::CurrentStakingPoolVoter, + db::{ + common::models::stake_models::{ + current_delegated_voter::CurrentDelegatedVoter, delegator_pools::DelegatorPool, + }, + postgres::models::stake_models::{ + delegator_activities::DelegatedStakingActivity, + delegator_balances::{CurrentDelegatorBalance, DelegatorBalance}, + delegator_pools::{CurrentDelegatorPoolBalance, DelegatorPoolBalance}, + proposal_votes::ProposalVote, + staking_pool_voter::CurrentStakingPoolVoter, + }, }, processors::stake_processor::{ insert_current_delegated_voter_query, insert_current_delegator_balances_query,