Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Staking Models and Create Parquet Stake Processor #647

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions rust/processor/src/db/common/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pub mod default_models;
pub mod event_models;
pub mod fungible_asset_models;
pub mod object_models;
pub mod stake_models;
pub mod token_v2_models;
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]

use super::{
delegator_balances::{CurrentDelegatorBalance, ShareToStakingPoolMapping},
stake_utils::VoteDelegationTableItem,
};
use super::delegator_balances::ShareToRawStakingPoolMapping;
use crate::{
db::common::models::stake_models::{
delegator_balances::RawCurrentDelegatorBalance, stake_utils::VoteDelegationTableItem,
},
schema::current_delegated_voter,
utils::{database::DbPoolConnection, util::standardize_address},
};
Expand Down Expand Up @@ -135,14 +135,14 @@ impl CurrentDelegatedVoter {
write_table_item: &WriteTableItem,
txn_version: i64,
txn_timestamp: chrono::NaiveDateTime,
active_pool_to_staking_pool: &ShareToStakingPoolMapping,
active_pool_to_staking_pool: &ShareToRawStakingPoolMapping,
previous_delegated_voters: &CurrentDelegatedVoterMap,
conn: &mut DbPoolConnection<'_>,
query_retries: u32,
query_retry_delay_ms: u64,
) -> anyhow::Result<Option<Self>> {
if let Some((_, active_balance)) =
CurrentDelegatorBalance::get_active_share_from_write_table_item(
RawCurrentDelegatorBalance::get_active_share_from_write_table_item(
write_table_item,
txn_version,
0, // placeholder
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright © Aptos Foundation

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]

use crate::{
db::common::models::stake_models::stake_utils::StakeEvent,
utils::{
counters::PROCESSOR_UNKNOWN_TYPE_COUNT,
util::{parse_timestamp, standardize_address, u64_to_bigdecimal},
},
};
use aptos_protos::transaction::v1::{transaction::TxnData, Transaction};
use bigdecimal::BigDecimal;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RawDelegatedStakingActivity {
pub transaction_version: i64,
pub event_index: i64,
pub delegator_address: String,
pub pool_address: String,
pub event_type: String,
pub amount: BigDecimal,
pub block_timestamp: chrono::NaiveDateTime,
}
pub trait RawDelegatedStakingActivityConvertible {
fn from_raw(raw: RawDelegatedStakingActivity) -> Self;
}

impl RawDelegatedStakingActivity {
/// Pretty straightforward parsing from known delegated staking events
pub fn from_transaction(transaction: &Transaction) -> anyhow::Result<Vec<Self>> {
let mut delegator_activities = vec![];
let txn_data = match transaction.txn_data.as_ref() {
Some(data) => data,
None => {
PROCESSOR_UNKNOWN_TYPE_COUNT
.with_label_values(&["DelegatedStakingActivity"])
.inc();
tracing::warn!(

Check warning on line 41 in rust/processor/src/db/common/models/stake_models/delegator_activities.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/stake_models/delegator_activities.rs#L38-L41

Added lines #L38 - L41 were not covered by tests
transaction_version = transaction.version,
"Transaction data doesn't exist",

Check warning on line 43 in rust/processor/src/db/common/models/stake_models/delegator_activities.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/stake_models/delegator_activities.rs#L43

Added line #L43 was not covered by tests
);
return Ok(delegator_activities);

Check warning on line 45 in rust/processor/src/db/common/models/stake_models/delegator_activities.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/stake_models/delegator_activities.rs#L45

Added line #L45 was not covered by tests
},
};

let txn_version = transaction.version as i64;
let events = match txn_data {
TxnData::User(txn) => &txn.events,
TxnData::BlockMetadata(txn) => &txn.events,
TxnData::Validator(txn) => &txn.events,
_ => return Ok(delegator_activities),

Check warning on line 54 in rust/processor/src/db/common/models/stake_models/delegator_activities.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/stake_models/delegator_activities.rs#L52-L54

Added lines #L52 - L54 were not covered by tests
};
let block_timestamp = parse_timestamp(transaction.timestamp.as_ref().unwrap(), txn_version);
for (index, event) in events.iter().enumerate() {
let event_index = index as i64;
if let Some(staking_event) =
StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)?
{
let activity = match staking_event {
StakeEvent::AddStakeEvent(inner) => RawDelegatedStakingActivity {
transaction_version: txn_version,
event_index,
delegator_address: standardize_address(&inner.delegator_address),
pool_address: standardize_address(&inner.pool_address),
event_type: event.type_str.clone(),
amount: u64_to_bigdecimal(inner.amount_added),
block_timestamp,
},
StakeEvent::UnlockStakeEvent(inner) => RawDelegatedStakingActivity {
transaction_version: txn_version,
event_index,
delegator_address: standardize_address(&inner.delegator_address),
pool_address: standardize_address(&inner.pool_address),
event_type: event.type_str.clone(),
amount: u64_to_bigdecimal(inner.amount_unlocked),
block_timestamp,
},
StakeEvent::WithdrawStakeEvent(inner) => RawDelegatedStakingActivity {
transaction_version: txn_version,
event_index,
delegator_address: standardize_address(&inner.delegator_address),
pool_address: standardize_address(&inner.pool_address),
event_type: event.type_str.clone(),
amount: u64_to_bigdecimal(inner.amount_withdrawn),
block_timestamp,
},
StakeEvent::ReactivateStakeEvent(inner) => RawDelegatedStakingActivity {
transaction_version: txn_version,
event_index,
delegator_address: standardize_address(&inner.delegator_address),
pool_address: standardize_address(&inner.pool_address),
event_type: event.type_str.clone(),
amount: u64_to_bigdecimal(inner.amount_reactivated),
block_timestamp,
},
StakeEvent::DistributeRewardsEvent(inner) => RawDelegatedStakingActivity {
transaction_version: txn_version,
event_index,
delegator_address: "".to_string(),
pool_address: standardize_address(&inner.pool_address),
event_type: event.type_str.clone(),
amount: u64_to_bigdecimal(inner.rewards_amount),
block_timestamp,
},
_ => continue,

Check warning on line 108 in rust/processor/src/db/common/models/stake_models/delegator_activities.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/stake_models/delegator_activities.rs#L90-L108

Added lines #L90 - L108 were not covered by tests
};
delegator_activities.push(activity);
}
}
Ok(delegator_activities)
}
}
Loading
Loading