Skip to content

Commit

Permalink
[event_v2] indexer logic update to handle migration (#360)
Browse files Browse the repository at this point in the history
* [event_v2] an example code for event v2 migration

* event v2 indexing.

---------

Co-authored-by: Larry Liu <[email protected]>
  • Loading branch information
lightmark and larry-aptos committed Jul 15, 2024
1 parent 7e24dd8 commit 8f9cb42
Show file tree
Hide file tree
Showing 12 changed files with 311 additions and 67 deletions.
14 changes: 12 additions & 2 deletions python/processors/nft_orderbooks/parsers/okx_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
"okx_listing_utils::CancelListingEvent",
]
)
DEPOSIT_EVENT_V1 = "0x3::token::DepositEvent"
DEPOSIT_EVENT_V2 = "0x3::token::Deposit"


def parse_marketplace_events(
Expand Down Expand Up @@ -141,8 +143,16 @@ def parse_marketplace_events(
def get_token_data_from_deposit_events(user_transaction) -> Dict[str, TokenDataIdType]:
# Extract deposit events, which contain token metadata
deposit_events: Dict[str, TokenDataIdType] = {}
for event in user_transaction.events:
if event.type_str != "0x3::token::DepositEvent":
for idx, event in enumerate(user_transaction.events):
if event.type_str != DEPOSIT_EVENT_V1 and event.type_str != DEPOSIT_EVENT_V2:
continue
# Current event is either DEPOSIT_EVENT_V1 or DEPOSIT_EVENT_V2.
if (
idx > 0
# skip if prior event is V2 deposit event.
and user_transaction.events[idx - 1].type_str == DEPOSIT_EVENT_V2
and event.type_str == DEPOSIT_EVENT_V1
):
continue
account_address = standardize_address(event_utils.get_account_address(event))
data = json.loads(event.data)
Expand Down
2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ google-cloud-googleapis = "0.10.0"
google-cloud-pubsub = "0.18.0"
hex = "0.4.3"
itertools = "0.12.1"
lazy_static = "1.4.0"
jemallocator = { version = "0.5.0", features = [
"profiling",
"unprefixed_malloc_on_supported_platforms",
] }
kanal = { version = "0.1.0-pre8", features = ["async"] }
lazy_static = "1.4.0"
once_cell = "1.10.0"
num_cpus = "1.16.0"
pbjson = "0.5.1"
Expand Down
64 changes: 48 additions & 16 deletions rust/processor/src/db/common/models/coin_models/coin_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
},
v2_fungible_asset_utils::FeeStatement,
},
should_skip,
user_transactions_models::signatures::Signature,
},
schema::coin_activities,
Expand Down Expand Up @@ -186,6 +187,16 @@ impl CoinActivity {
}
}
for (index, event) in events.iter().enumerate() {
if should_skip(
event,
if index == 0 {
None
} else {
Some(&events[index - 1])
},
) {
continue;
}
let event_type = event.type_str.clone();
if let Some(parsed_event) =
CoinEvent::from_event(event_type.as_str(), &event.data, txn_version).unwrap()
Expand Down Expand Up @@ -222,23 +233,44 @@ impl CoinActivity {
transaction_timestamp: chrono::NaiveDateTime,
event_index: i64,
) -> Self {
let amount = match coin_event {
CoinEvent::WithdrawCoinEvent(inner) => inner.amount.clone(),
CoinEvent::DepositCoinEvent(inner) => inner.amount.clone(),
};
let event_move_guid = EventGuidResource {
addr: standardize_address(event.key.as_ref().unwrap().account_address.as_str()),
creation_num: event.key.as_ref().unwrap().creation_number as i64,
let (owner_address, amount, coin_type_option) = match coin_event {
CoinEvent::WithdrawCoinEvent(inner) => (
standardize_address(&event.key.as_ref().unwrap().account_address),
inner.amount.clone(),
None,
),
CoinEvent::DepositCoinEvent(inner) => (
standardize_address(&event.key.as_ref().unwrap().account_address),
inner.amount.clone(),
None,
),
CoinEvent::WithdrawCoinEventV2(inner) => (
standardize_address(&inner.account),
inner.amount.clone(),
Some(inner.coin_type.clone()),
),
CoinEvent::DepositCoinEventV2(inner) => (
standardize_address(&inner.account),
inner.amount.clone(),
Some(inner.coin_type.clone()),
),
};
let coin_type =
let coin_type = if let Some(coin_type) = coin_type_option {
coin_type
} else {
let event_move_guid = EventGuidResource {
addr: standardize_address(event.key.as_ref().unwrap().account_address.as_str()),
creation_num: event.key.as_ref().unwrap().creation_number as i64,
};
event_to_coin_type
.get(&event_move_guid)
.unwrap_or_else(|| {
panic!(
"Could not find event in resources (CoinStore), version: {}, event guid: {:?}, mapping: {:?}",
txn_version, event_move_guid, event_to_coin_type
)
}).clone();
.get(&event_move_guid)
.unwrap_or_else(|| {
panic!(
"Could not find event in resources (CoinStore), version: {}, event guid: {:?}, mapping: {:?}",
txn_version, event_move_guid, event_to_coin_type
)
}).clone()
};

Self {
transaction_version: txn_version,
Expand All @@ -247,7 +279,7 @@ impl CoinActivity {
),
event_creation_number: event.key.as_ref().unwrap().creation_number as i64,
event_sequence_number: event.sequence_number as i64,
owner_address: standardize_address(&event.key.as_ref().unwrap().account_address),
owner_address,
coin_type,
amount,
activity_type: event_type.to_string(),
Expand Down
24 changes: 24 additions & 0 deletions rust/processor/src/db/common/models/coin_models/coin_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,22 @@ pub struct DepositCoinEvent {
pub amount: BigDecimal,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WithdrawCoinEventV2 {
pub coin_type: String,
pub account: String,
#[serde(deserialize_with = "deserialize_from_string")]
pub amount: BigDecimal,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DepositCoinEventV2 {
pub coin_type: String,
pub account: String,
#[serde(deserialize_with = "deserialize_from_string")]
pub amount: BigDecimal,
}

pub struct CoinInfoType {
coin_type: String,
creator_address: String,
Expand Down Expand Up @@ -313,6 +329,8 @@ impl CoinResource {
pub enum CoinEvent {
WithdrawCoinEvent(WithdrawCoinEvent),
DepositCoinEvent(DepositCoinEvent),
WithdrawCoinEventV2(WithdrawCoinEventV2),
DepositCoinEventV2(DepositCoinEventV2),
}

impl CoinEvent {
Expand All @@ -324,6 +342,12 @@ impl CoinEvent {
"0x1::coin::DepositEvent" => {
serde_json::from_str(data).map(|inner| Some(CoinEvent::DepositCoinEvent(inner)))
},
"0x1::coin::CoinWithdraw" => {
serde_json::from_str(data).map(|inner| Some(CoinEvent::WithdrawCoinEventV2(inner)))
},
"0x1::coin::CoinDeposit" => {
serde_json::from_str(data).map(|inner| Some(CoinEvent::DepositCoinEventV2(inner)))
},
_ => Ok(None),
}
.context(format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ impl FungibleAssetActivity {
inner.amount.clone(),
None,
),
CoinEvent::WithdrawCoinEventV2(inner) => (
standardize_address(&inner.account),
inner.amount.clone(),
Some(inner.coin_type.clone()),
),
CoinEvent::DepositCoinEventV2(inner) => (
standardize_address(&inner.account),
inner.amount.clone(),
Some(inner.coin_type.clone()),
),
};
let coin_type = if let Some(coin_type) = coin_type_option {
coin_type
Expand Down
93 changes: 93 additions & 0 deletions rust/processor/src/db/common/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use ahash::AHashMap;
use aptos_protos::transaction::v1::Event;
use lazy_static::lazy_static;

pub mod account_transaction_models;
pub mod ans_models;
pub mod coin_models;
Expand All @@ -16,3 +20,92 @@ pub mod token_models;
pub mod token_v2_models;
pub mod transaction_metadata_model;
pub mod user_transactions_models;

lazy_static! {
pub static ref V1_TO_V2_MAPPING: AHashMap<&'static str, &'static str> = {
vec![
(
"0x1::aptos_governance::VoteEvent",
"0x1::aptos_governance::Vote",
),
("0x1::coin::CoinDepositEvent", "0x1::coin::CoinDeposit"),
("0x1::coin::CoinWithdrawEvent", "0x1::coin::CoinWithdraw"),
(
"0x1::delegation_pool::AddStakeEvent",
"0x1::delegation_pool::AddStake",
),
(
"0x1::delegation_pool::UnlockStakeEvent",
"0x1::delegation_pool::UnlockStake",
),
(
"0x1::delegation_pool::WithdrawStakeEvent",
"0x1::delegation_pool::WithdrawStake",
),
(
"0x1::delegation_pool::ReactivateStakeEvent",
"0x1::delegation_pool::ReactivateStake",
),
("0x1::object::TransferEvent", "0x1::object::Transfer"),
(
"0x1::stake::DistributeRewardsEvent",
"0x1::stake::DistributeRewards",
),
("0x3::token::MintTokenEvent", "0x3::token::MintToken"),
("0x3::token::BurnTokenEvent", "0x3::token::BurnToken"),
(
"0x3::token::MutateTokenPropertyMapEvent",
"0x3::token::MutateTokenPropertyMap",
),
("0x3::token::WithdrawEvent", "0x3::token::Withdraw"),
("0x3::token::DepositEvent", "0x3::token::Deposit"),
(
"0x3::token_transfers::TokenOfferEvent",
"0x3::token_transfers::TokenOffer",
),
(
"0x3::token_transfers::TokenCancelOfferEvent",
"0x3::token_transfers::TokenCancelOffer",
),
(
"0x3::token_transfers::TokenClaimEvent",
"0x3::token_transfers::TokenClaim",
),
("0x4::token::MutationEvent", "0x4::token::Mutation"),
]
.into_iter()
.collect()
};
}

pub(crate) fn should_skip(event: &Event, prev_event: Option<&Event>) -> bool {
if let (Some(prev_event_type), Some(prev_event)) =
(V1_TO_V2_MAPPING.get(&event.type_str.as_str()), prev_event)
{
*prev_event_type == prev_event.type_str
} else {
false
}
}

#[cfg(test)]
mod tests {
use super::*;
// Tests are to make sure
// - only when previous processed event is event v2, current event v1 can be skipped
#[test]
fn test_should_not_break_for_length() {
let events = [
Event {
type_str: "0x4::token::Mutation".to_string(),
..Event::default()
},
Event {
type_str: "0x4::token::MutationEvent".to_string(),
..Event::default()
},
];
assert!(!should_skip(&events[0], None));
assert!(should_skip(&events[1], Some(&events[0])));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use super::stake_utils::StakeEvent;
use crate::{
db::common::models::should_skip,
schema::delegated_staking_activities,
utils::{
counters::PROCESSOR_UNKNOWN_TYPE_COUNT,
Expand Down Expand Up @@ -58,6 +59,16 @@ impl DelegatedStakingActivity {
if let Some(staking_event) =
StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)?
{
if should_skip(
event,
if index == 0 {
None
} else {
Some(&events[index - 1])
},
) {
continue;
}
let activity = match staking_event {
StakeEvent::AddStakeEvent(inner) => DelegatedStakingActivity {
transaction_version: txn_version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use super::stake_utils::StakeEvent;
use crate::{
db::common::models::should_skip,
schema::proposal_votes,
utils::{
counters::PROCESSOR_UNKNOWN_TYPE_COUNT,
Expand Down Expand Up @@ -49,10 +50,20 @@ impl ProposalVote {
let txn_version = transaction.version as i64;

if let TxnData::User(user_txn) = txn_data {
for event in &user_txn.events {
for (index, event) in user_txn.events.iter().enumerate() {
if let Some(StakeEvent::GovernanceVoteEvent(ev)) =
StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)?
{
if should_skip(
event,
if index == 0 {
None
} else {
Some(&user_txn.events[index - 1])
},
) {
continue;
};
proposal_votes.push(Self {
transaction_version: txn_version,
proposal_id: ev.proposal_id as i64,
Expand Down
17 changes: 10 additions & 7 deletions rust/processor/src/db/common/models/stake_models/stake_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,21 +200,24 @@ pub enum StakeEvent {
impl StakeEvent {
pub fn from_event(data_type: &str, data: &str, txn_version: i64) -> Result<Option<Self>> {
match data_type {
"0x1::aptos_governance::VoteEvent" => {
"0x1::aptos_governance::VoteEvent" | "0x1::aptos_governance::Vote" => {
serde_json::from_str(data).map(|inner| Some(StakeEvent::GovernanceVoteEvent(inner)))
},
"0x1::stake::DistributeRewardsEvent" => serde_json::from_str(data)
.map(|inner| Some(StakeEvent::DistributeRewardsEvent(inner))),
"0x1::delegation_pool::AddStakeEvent" => {
"0x1::stake::DistributeRewardsEvent" | "0x1::stake::DistributeRewards" => {
serde_json::from_str(data)
.map(|inner| Some(StakeEvent::DistributeRewardsEvent(inner)))
},
"0x1::delegation_pool::AddStakeEvent" | "0x1::delegation_pool::AddStake" => {
serde_json::from_str(data).map(|inner| Some(StakeEvent::AddStakeEvent(inner)))
},
"0x1::delegation_pool::UnlockStakeEvent" => {
"0x1::delegation_pool::UnlockStakeEvent" | "0x1::delegation_pool::UnlockStake" => {
serde_json::from_str(data).map(|inner| Some(StakeEvent::UnlockStakeEvent(inner)))
},
"0x1::delegation_pool::WithdrawStakeEvent" => {
"0x1::delegation_pool::WithdrawStakeEvent" | "0x1::delegation_pool::WithdrawStake" => {
serde_json::from_str(data).map(|inner| Some(StakeEvent::WithdrawStakeEvent(inner)))
},
"0x1::delegation_pool::ReactivateStakeEvent" => serde_json::from_str(data)
"0x1::delegation_pool::ReactivateStakeEvent"
| "0x1::delegation_pool::ReactivateStake" => serde_json::from_str(data)
.map(|inner| Some(StakeEvent::ReactivateStakeEvent(inner))),
_ => Ok(None),
}
Expand Down
Loading

0 comments on commit 8f9cb42

Please sign in to comment.