Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Staking] Add indexing support for delegation pool allowlist #482

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This file should undo anything in `up.sql`
DROP TABLE IF EXISTS delegated_staking_pool_allowlist;
DROP TABLE IF EXISTS current_delegated_staking_pool_allowlist;
ALTER TABLE delegated_staking_pools DROP COLUMN IF EXISTS allowlist_enabled;
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-- Your SQL goes here
ALTER TABLE delegated_staking_pools
ADD COLUMN IF NOT EXISTS allowlist_enabled BOOLEAN NOT NULL DEFAULT FALSE;

CREATE TABLE IF NOT EXISTS current_delegated_staking_pool_allowlist (
staking_pool_address VARCHAR(66) NOT NULL,
delegator_address VARCHAR(66) NOT NULL,
-- Used for soft delete. On chain, it's a delete operation.
is_allowed BOOLEAN NOT NULL DEFAULT FALSE,
last_transaction_version BIGINT NOT NULL,
inserted_at TIMESTAMP NOT NULL DEFAULT NOW(),
PRIMARY KEY (delegator_address, staking_pool_address)
);

CREATE TABLE IF NOT EXISTS delegated_staking_pool_allowlist (
staking_pool_address VARCHAR(66) NOT NULL,
delegator_address VARCHAR(66) NOT NULL,
-- Used for soft delete. On chain, it's a delete operation.
is_allowed BOOLEAN NOT NULL DEFAULT FALSE,
transaction_version BIGINT NOT NULL,
inserted_at TIMESTAMP NOT NULL DEFAULT NOW(),
PRIMARY KEY (transaction_version, delegator_address, staking_pool_address)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]

use super::stake_utils::StakeEvent;
use crate::{
schema::{current_delegated_staking_pool_allowlist, delegated_staking_pool_allowlist},
utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address},
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::{transaction::TxnData, Transaction};
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(delegator_address, staking_pool_address))]
#[diesel(table_name = current_delegated_staking_pool_allowlist)]
pub struct CurrentDelegatedStakingPoolAllowlist {
pub staking_pool_address: String,
pub delegator_address: String,
pub is_allowed: bool,
last_transaction_version: i64,
}

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(transaction_version, delegator_address, staking_pool_address))]
#[diesel(table_name = delegated_staking_pool_allowlist)]
pub struct DelegatedStakingPoolAllowlist {
pub staking_pool_address: String,
pub delegator_address: String,
pub is_allowed: bool,
transaction_version: i64,
}

impl CurrentDelegatedStakingPoolAllowlist {
pub fn from_transaction(
transaction: &Transaction,
) -> anyhow::Result<AHashMap<(String, String), Self>> {
let mut delegated_staking_pool_allowlist = AHashMap::new();
let txn_data = match transaction.txn_data.as_ref() {
Some(data) => data,
None => {
PROCESSOR_UNKNOWN_TYPE_COUNT
.with_label_values(&["DelegatedStakingPoolAllowlist"])
.inc();
tracing::warn!(
transaction_version = transaction.version,
"Transaction data doesn't exist",
);
return Ok(delegated_staking_pool_allowlist);
},
};
let txn_version = transaction.version as i64;

if let TxnData::User(user_txn) = txn_data {
for event in &user_txn.events {
if let Some(StakeEvent::AllowlistDelegatorEvent(ev)) =
StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)?
{
let current_delegated_staking_pool_allowlist =
CurrentDelegatedStakingPoolAllowlist {
last_transaction_version: txn_version,
staking_pool_address: standardize_address(&ev.pool_address),
delegator_address: standardize_address(&ev.delegator_address),
is_allowed: ev.enabled,
};
delegated_staking_pool_allowlist.insert(
(
current_delegated_staking_pool_allowlist
.delegator_address
.clone(),
current_delegated_staking_pool_allowlist
.staking_pool_address
.clone(),
),
current_delegated_staking_pool_allowlist,
);
}
}
}
Ok(delegated_staking_pool_allowlist)
}
}

impl DelegatedStakingPoolAllowlist {
pub fn from_transaction(transaction: &Transaction) -> anyhow::Result<Vec<Self>> {
let mut delegated_staking_pool_allowlist = vec![];
let txn_data = match transaction.txn_data.as_ref() {
Some(data) => data,
None => {
PROCESSOR_UNKNOWN_TYPE_COUNT
.with_label_values(&["DelegatedStakingPoolAllowlist"])
.inc();
tracing::warn!(
transaction_version = transaction.version,
"Transaction data doesn't exist",
);
return Ok(delegated_staking_pool_allowlist);
},
};
let txn_version = transaction.version as i64;

if let TxnData::User(user_txn) = txn_data {
for event in &user_txn.events {
if let Some(StakeEvent::AllowlistDelegatorEvent(ev)) =
StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)?
{
delegated_staking_pool_allowlist.push(Self {
transaction_version: txn_version,
staking_pool_address: standardize_address(&ev.pool_address),
delegator_address: standardize_address(&ev.delegator_address),
is_allowed: ev.enabled,
});
}
}
}
Ok(delegated_staking_pool_allowlist)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]

use super::stake_utils::{StakeResource, StakeTableItem};
use super::stake_utils::{StakeEvent, StakeResource, StakeTableItem};
use crate::{
schema::{
current_delegated_staking_pool_balances, delegated_staking_pool_balances,
Expand All @@ -30,7 +30,10 @@ pub type DelegatorPoolBalanceMap = AHashMap<StakingPoolAddress, CurrentDelegator
#[diesel(table_name = delegated_staking_pools)]
pub struct DelegatorPool {
pub staking_pool_address: String,
// We should add a new field like `last_transaction_version` to track the last transaction version
// that updated the pool
pub first_transaction_version: i64,
pub allowlist_enabled: bool,
}

// Metadata to fill pool balances and delegator balance
Expand Down Expand Up @@ -136,6 +139,41 @@ impl DelegatorPool {
}
}
}
let txn_version = transaction.version as i64;

let events = match txn_data {
TxnData::User(txn) => &txn.events,
TxnData::BlockMetadata(txn) => &txn.events,
_ => {
return Ok((
delegator_pool_map,
delegator_pool_balances,
delegator_pool_balances_map,
))
},
};

for event in events {
if let Some(StakeEvent::AllowlistingEvent(inner)) =
StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)?
{
let staking_pool_address = standardize_address(&inner.pool_address);
let enabled = inner.enabled;
if delegator_pool_map.contains_key(&staking_pool_address) {
delegator_pool_map
.get_mut(&staking_pool_address)
.expect("Pool should exist")
.allowlist_enabled = enabled;
} else {
let pool = DelegatorPool {
staking_pool_address: staking_pool_address.clone(),
first_transaction_version: txn_version,
allowlist_enabled: enabled,
};
delegator_pool_map.insert(staking_pool_address.clone(), pool);
}
}
}
}
Ok((
delegator_pool_map,
Expand Down Expand Up @@ -211,6 +249,7 @@ impl DelegatorPool {
Self {
staking_pool_address: staking_pool_address.clone(),
first_transaction_version: transaction_version,
allowlist_enabled: false,
},
DelegatorPoolBalance {
transaction_version,
Expand Down
1 change: 1 addition & 0 deletions rust/processor/src/db/common/models/stake_models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

pub mod current_delegated_voter;
pub mod delegator_activities;
pub mod delegator_allowlist;
pub mod delegator_balances;
pub mod delegator_pools;
pub mod proposal_votes;
Expand Down
21 changes: 21 additions & 0 deletions rust/processor/src/db/common/models/stake_models/stake_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ pub struct ReactivateStakeEvent {
pub delegator_address: String,
pub pool_address: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct EnableAllowlistingEvent {
pub pool_address: String,
pub enabled: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct EnableDelegatorAllowlistingEvent {
pub pool_address: String,
pub delegator_address: String,
pub enabled: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum StakeTableItem {
Expand Down Expand Up @@ -195,6 +207,8 @@ pub enum StakeEvent {
UnlockStakeEvent(UnlockStakeEvent),
WithdrawStakeEvent(WithdrawStakeEvent),
ReactivateStakeEvent(ReactivateStakeEvent),
AllowlistingEvent(EnableAllowlistingEvent),
AllowlistDelegatorEvent(EnableDelegatorAllowlistingEvent),
}

impl StakeEvent {
Expand All @@ -216,6 +230,13 @@ impl StakeEvent {
},
"0x1::delegation_pool::ReactivateStakeEvent" => serde_json::from_str(data)
.map(|inner| Some(StakeEvent::ReactivateStakeEvent(inner))),
"0x1::delegation_pool::EnableDelegatorsAllowlisting"
| "0x1::delegation_pool::DisableDelegatorsAllowlisting" => {
serde_json::from_str(data).map(|inner| Some(StakeEvent::AllowlistingEvent(inner)))
},
"0x1::delegation_pool::AllowlistDelegator"
| "0x1::delegation_pool::RemoveDelegatorFromAllowlist" => serde_json::from_str(data)
.map(|inner| Some(StakeEvent::AllowlistDelegatorEvent(inner))),
_ => Ok(None),
}
.context(format!(
Expand Down
27 changes: 27 additions & 0 deletions rust/processor/src/db/postgres/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,18 @@ diesel::table! {
}
}

diesel::table! {
current_delegated_staking_pool_allowlist (delegator_address, staking_pool_address) {
#[max_length = 66]
staking_pool_address -> Varchar,
#[max_length = 66]
delegator_address -> Varchar,
is_allowed -> Bool,
last_transaction_version -> Int8,
inserted_at -> Timestamp,
}
}

diesel::table! {
current_delegated_staking_pool_balances (staking_pool_address) {
#[max_length = 66]
Expand Down Expand Up @@ -694,6 +706,18 @@ diesel::table! {
}
}

diesel::table! {
delegated_staking_pool_allowlist (transaction_version, delegator_address, staking_pool_address) {
#[max_length = 66]
staking_pool_address -> Varchar,
#[max_length = 66]
delegator_address -> Varchar,
is_allowed -> Bool,
transaction_version -> Int8,
inserted_at -> Timestamp,
}
}

diesel::table! {
delegated_staking_pool_balances (transaction_version, staking_pool_address) {
transaction_version -> Int8,
Expand All @@ -716,6 +740,7 @@ diesel::table! {
staking_pool_address -> Varchar,
first_transaction_version -> Int8,
inserted_at -> Timestamp,
allowlist_enabled -> Bool,
}
}

Expand Down Expand Up @@ -1300,6 +1325,7 @@ diesel::allow_tables_to_appear_in_same_query!(
current_coin_balances,
current_collection_datas,
current_collections_v2,
current_delegated_staking_pool_allowlist,
current_delegated_staking_pool_balances,
current_delegated_voter,
current_delegator_balances,
Expand All @@ -1316,6 +1342,7 @@ diesel::allow_tables_to_appear_in_same_query!(
current_token_v2_metadata,
current_unified_fungible_asset_balances_to_be_renamed,
delegated_staking_activities,
delegated_staking_pool_allowlist,
delegated_staking_pool_balances,
delegated_staking_pools,
delegator_balances,
Expand Down
27 changes: 24 additions & 3 deletions rust/processor/src/processors/stake_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use crate::{
db::common::models::stake_models::{
current_delegated_voter::CurrentDelegatedVoter,
delegator_activities::DelegatedStakingActivity,
delegator_allowlist::{
CurrentDelegatedStakingPoolAllowlist, DelegatedStakingPoolAllowlist,
},
delegator_balances::{
CurrentDelegatorBalance, CurrentDelegatorBalanceMap, DelegatorBalance,
},
Expand Down Expand Up @@ -303,8 +306,9 @@ fn insert_delegator_pools_query(
.on_conflict(staking_pool_address)
.do_update()
.set((
first_transaction_version.eq(excluded(first_transaction_version)),
inserted_at.eq(excluded(inserted_at)),
first_transaction_version.eq(excluded(first_transaction_version)),
allowlist_enabled.eq(excluded(allowlist_enabled)),
inserted_at.eq(excluded(inserted_at)),
)),
Some(
" WHERE delegated_staking_pools.first_transaction_version >= EXCLUDED.first_transaction_version ",
Expand Down Expand Up @@ -416,6 +420,9 @@ impl ProcessorTrait for StakeProcessor {
let mut all_current_delegated_voter = AHashMap::new();
let mut all_vote_delegation_handle_to_pool_address = AHashMap::new();

let mut all_delegator_allowlist = vec![];
let mut all_current_delegator_allowlist = AHashMap::new();

for txn in &transactions {
// Add votes data
let current_stake_pool_voter = CurrentStakingPoolVoter::from_transaction(txn).unwrap();
Expand All @@ -430,10 +437,24 @@ impl ProcessorTrait for StakeProcessor {
// Add delegator pools
let (delegator_pools, mut delegator_pool_balances, current_delegator_pool_balances) =
DelegatorPool::from_transaction(txn).unwrap();
all_delegator_pools.extend(delegator_pools);
for (pool_address, pool) in delegator_pools.iter() {
// We need to keep the first transaction version for each pool.
if let Some(existing_pool) = all_delegator_pools.get_mut(pool_address) {
existing_pool.allowlist_enabled = pool.allowlist_enabled;
} else {
all_delegator_pools.insert(pool_address.clone(), pool.clone());
}
}
all_delegator_pool_balances.append(&mut delegator_pool_balances);
all_current_delegator_pool_balances.extend(current_delegator_pool_balances);

// Add delegator pool allowlist.
let delegator_allowlist = DelegatedStakingPoolAllowlist::from_transaction(txn).unwrap();
all_delegator_allowlist.extend(delegator_allowlist);
let current_delegator_allowlist =
CurrentDelegatedStakingPoolAllowlist::from_transaction(txn).unwrap();
all_current_delegator_allowlist.extend(current_delegator_allowlist);

// Moving the transaction code here is the new paradigm to avoid redoing a lot of the duplicate work
// Currently only delegator voting follows this paradigm
// TODO: refactor all the other staking code to follow this paradigm
Expand Down
Loading