From 4d655b4a5a314e842bdbc1edc2ac4a7c3bc89af1 Mon Sep 17 00:00:00 2001 From: Aaron Gao Date: Tue, 30 Apr 2024 23:37:48 -0600 Subject: [PATCH 1/2] [event_v2] an example code for event v2 migration --- .../src/models/coin_models/coin_activities.rs | 55 +++++++---- .../src/models/coin_models/coin_utils.rs | 46 ++++++++- .../v2_fungible_asset_activities.rs | 95 ++++++++++++++----- .../v2_fungible_asset_utils.rs | 32 +++++++ rust/processor/src/models/mod.rs | 9 ++ .../src/models/stake_models/proposal_votes.rs | 6 +- .../src/models/stake_models/stake_utils.rs | 17 ++-- .../src/models/token_models/token_utils.rs | 21 ++-- .../processors/fungible_asset_processor.rs | 5 + 9 files changed, 221 insertions(+), 65 deletions(-) diff --git a/rust/processor/src/models/coin_models/coin_activities.rs b/rust/processor/src/models/coin_models/coin_activities.rs index 86e750117..5b348fb79 100644 --- a/rust/processor/src/models/coin_models/coin_activities.rs +++ b/rust/processor/src/models/coin_models/coin_activities.rs @@ -20,6 +20,7 @@ use crate::{ }, v2_fungible_asset_utils::FeeStatement, }, + should_skip, user_transactions_models::signatures::Signature, }, processors::coin_processor::APTOS_COIN_TYPE_STR, @@ -198,14 +199,18 @@ impl CoinActivity { } } for (index, event) in events.iter().enumerate() { + if should_skip(index, event, events) { + continue; + } let event_type = event.type_str.clone(); - if let Some(parsed_event) = + if let Some((parsed_event, coin_type_option)) = CoinEvent::from_event(event_type.as_str(), &event.data, txn_version).unwrap() { coin_activities.push(Self::from_parsed_event( &event_type, event, &parsed_event, + coin_type_option, txn_version, &all_event_to_coin_type, block_height, @@ -228,6 +233,7 @@ impl CoinActivity { event_type: &str, event: &EventPB, coin_event: &CoinEvent, + coin_type_option: Option, txn_version: i64, event_to_coin_type: &EventToCoinType, block_height: i64, @@ -235,23 +241,38 @@ impl CoinActivity { transaction_timestamp: chrono::NaiveDateTime, event_index: i64, ) -> Self { - let amount = match coin_event { - CoinEvent::WithdrawCoinEvent(inner) => inner.amount.clone(), - CoinEvent::DepositCoinEvent(inner) => inner.amount.clone(), - }; - let event_move_guid = EventGuidResource { - addr: standardize_address(event.key.as_ref().unwrap().account_address.as_str()), - creation_num: event.key.as_ref().unwrap().creation_number as i64, + let (owner_address, amount) = match coin_event { + CoinEvent::WithdrawCoinEvent(inner) => ( + standardize_address(&event.key.as_ref().unwrap().account_address), + inner.amount.clone(), + ), + CoinEvent::DepositCoinEvent(inner) => ( + standardize_address(&event.key.as_ref().unwrap().account_address), + inner.amount.clone(), + ), + CoinEvent::WithdrawCoinEventV2(inner) => { + (standardize_address(&inner.account), inner.amount.clone()) + }, + CoinEvent::DepositCoinEventV2(inner) => { + (standardize_address(&inner.account), inner.amount.clone()) + }, }; - let coin_type = + let coin_type = if let Some(coin_type) = coin_type_option { + coin_type + } else { + let event_move_guid = EventGuidResource { + addr: standardize_address(event.key.as_ref().unwrap().account_address.as_str()), + creation_num: event.key.as_ref().unwrap().creation_number as i64, + }; event_to_coin_type - .get(&event_move_guid) - .unwrap_or_else(|| { - panic!( - "Could not find event in resources (CoinStore), version: {}, event guid: {:?}, mapping: {:?}", - txn_version, event_move_guid, event_to_coin_type - ) - }).clone(); + .get(&event_move_guid) + .unwrap_or_else(|| { + panic!( + "Could not find event in resources (CoinStore), version: {}, event guid: {:?}, mapping: {:?}", + txn_version, event_move_guid, event_to_coin_type + ) + }).clone() + }; Self { transaction_version: txn_version, @@ -260,7 +281,7 @@ impl CoinActivity { ), event_creation_number: event.key.as_ref().unwrap().creation_number as i64, event_sequence_number: event.sequence_number as i64, - owner_address: standardize_address(&event.key.as_ref().unwrap().account_address), + owner_address, coin_type, amount, activity_type: event_type.to_string(), diff --git a/rust/processor/src/models/coin_models/coin_utils.rs b/rust/processor/src/models/coin_models/coin_utils.rs index 40ab07bdc..8294ced96 100644 --- a/rust/processor/src/models/coin_models/coin_utils.rs +++ b/rust/processor/src/models/coin_models/coin_utils.rs @@ -158,6 +158,20 @@ pub struct DepositCoinEvent { pub amount: BigDecimal, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct WithdrawCoinEventV2 { + pub account: String, + #[serde(deserialize_with = "deserialize_from_string")] + pub amount: BigDecimal, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct DepositCoinEventV2 { + pub account: String, + #[serde(deserialize_with = "deserialize_from_string")] + pub amount: BigDecimal, +} + pub struct CoinInfoType { coin_type: String, creator_address: String, @@ -288,16 +302,38 @@ impl CoinResource { pub enum CoinEvent { WithdrawCoinEvent(WithdrawCoinEvent), DepositCoinEvent(DepositCoinEvent), + WithdrawCoinEventV2(WithdrawCoinEventV2), + DepositCoinEventV2(DepositCoinEventV2), } impl CoinEvent { - pub fn from_event(data_type: &str, data: &str, txn_version: i64) -> Result> { + pub fn from_event( + data_type: &str, + data: &str, + txn_version: i64, + ) -> Result)>> { match data_type { - "0x1::coin::WithdrawEvent" => { - serde_json::from_str(data).map(|inner| Some(CoinEvent::WithdrawCoinEvent(inner))) + "0x1::coin::WithdrawEvent" => serde_json::from_str(data) + .map(|inner| Some((CoinEvent::WithdrawCoinEvent(inner), None))), + "0x1::coin::DepositEvent" => serde_json::from_str(data) + .map(|inner| Some((CoinEvent::WithdrawCoinEvent(inner), None))), + t if t.starts_with("0x1::coin::Withdraw") => { + let inner_type_start = t.find('<').unwrap(); + serde_json::from_str(data).map(|inner| { + Some(( + CoinEvent::WithdrawCoinEventV2(inner), + Some(t[inner_type_start + 1..t.len() - 1].into()), + )) + }) }, - "0x1::coin::DepositEvent" => { - serde_json::from_str(data).map(|inner| Some(CoinEvent::DepositCoinEvent(inner))) + t if t.starts_with("0x1::coin::Deposit") => { + let inner_type_start = t.find('<').unwrap(); + serde_json::from_str(data).map(|inner| { + Some(( + CoinEvent::WithdrawCoinEventV2(inner), + Some(t[inner_type_start + 1..t.len() - 1].into()), + )) + }) }, _ => Ok(None), } diff --git a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_activities.rs b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_activities.rs index 06802ba1e..9d651d24a 100644 --- a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_activities.rs +++ b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_activities.rs @@ -72,7 +72,38 @@ impl FungibleAssetActivity { if let Some(fa_event) = &FungibleAssetEvent::from_event(event_type.as_str(), &event.data, txn_version)? { - let storage_id = standardize_address(&event.key.as_ref().unwrap().account_address); + let (storage_id, is_frozen, amount) = match fa_event { + FungibleAssetEvent::WithdrawEvent(inner) => ( + standardize_address(&event.key.as_ref().unwrap().account_address), + None, + Some(inner.amount.clone()), + ), + FungibleAssetEvent::DepositEvent(inner) => ( + standardize_address(&event.key.as_ref().unwrap().account_address), + None, + Some(inner.amount.clone()), + ), + FungibleAssetEvent::FrozenEvent(inner) => ( + standardize_address(&event.key.as_ref().unwrap().account_address), + Some(inner.frozen), + None, + ), + FungibleAssetEvent::WithdrawEventV2(inner) => ( + inner.store.get_reference_address(), + None, + Some(inner.amount.clone()), + ), + FungibleAssetEvent::DepositEventV2(inner) => ( + inner.store.get_reference_address(), + None, + Some(inner.amount.clone()), + ), + FungibleAssetEvent::FrozenEventV2(inner) => ( + inner.store.get_reference_address(), + Some(inner.frozen), + None, + ), + }; // The event account address will also help us find fungible store which tells us where to find // the metadata @@ -81,12 +112,6 @@ impl FungibleAssetActivity { let fungible_asset = object_metadata.fungible_asset_store.as_ref().unwrap(); let asset_type = fungible_asset.metadata.get_reference_address(); - let (is_frozen, amount) = match fa_event { - FungibleAssetEvent::WithdrawEvent(inner) => (None, Some(inner.amount.clone())), - FungibleAssetEvent::DepositEvent(inner) => (None, Some(inner.amount.clone())), - FungibleAssetEvent::FrozenEvent(inner) => (Some(inner.frozen), None), - }; - return Ok(Some(Self { transaction_version: txn_version, event_index, @@ -119,36 +144,54 @@ impl FungibleAssetActivity { event_to_coin_type: &EventToCoinType, event_index: i64, ) -> anyhow::Result> { - if let Some(inner) = + if let Some((inner, coin_type_option)) = CoinEvent::from_event(event.type_str.as_str(), &event.data, txn_version)? { - let amount = match inner { - CoinEvent::WithdrawCoinEvent(inner) => inner.amount, - CoinEvent::DepositCoinEvent(inner) => inner.amount, - }; - let event_key = event.key.as_ref().context("event must have a key")?; - let event_move_guid = EventGuidResource { - addr: standardize_address(event_key.account_address.as_str()), - creation_num: event_key.creation_number as i64, + let (owner_address, amount) = match inner { + CoinEvent::WithdrawCoinEvent(inner) => ( + standardize_address(&event.key.as_ref().unwrap().account_address), + inner.amount.clone(), + ), + CoinEvent::DepositCoinEvent(inner) => ( + standardize_address(&event.key.as_ref().unwrap().account_address), + inner.amount.clone(), + ), + CoinEvent::WithdrawCoinEventV2(inner) => { + (standardize_address(&inner.account), inner.amount.clone()) + }, + CoinEvent::DepositCoinEventV2(inner) => { + (standardize_address(&inner.account), inner.amount.clone()) + }, }; - // Given this mapping only contains coin type < 1000 length, we should not assume that the mapping exists. - // If it doesn't exist, skip. - let coin_type = match event_to_coin_type.get(&event_move_guid) { - Some(coin_type) => coin_type.clone(), - None => { - tracing::warn!( + let coin_type = if let Some(coin_type) = coin_type_option { + coin_type + } else { + let event_key = event.key.as_ref().context("event must have a key")?; + let event_move_guid = EventGuidResource { + addr: standardize_address(event_key.account_address.as_str()), + creation_num: event_key.creation_number as i64, + }; + // Given this mapping only contains coin type < 1000 length, we should not assume that the mapping exists. + // If it doesn't exist, skip. + match event_to_coin_type.get(&event_move_guid) { + Some(coin_type) => coin_type.clone(), + None => { + tracing::warn!( "Could not find event in resources (CoinStore), version: {}, event guid: {:?}, mapping: {:?}", txn_version, event_move_guid, event_to_coin_type ); - return Ok(None); - }, + return Ok(None); + }, + } }; + let storage_id = - CoinInfoType::get_storage_id(coin_type.as_str(), event_move_guid.addr.as_str()); + CoinInfoType::get_storage_id(coin_type.as_str(), owner_address.as_str()); + Ok(Some(Self { transaction_version: txn_version, event_index, - owner_address: event_move_guid.addr, + owner_address, storage_id, asset_type: coin_type, is_frozen: None, diff --git a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs index da43ca2c0..547b528b4 100644 --- a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs +++ b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs @@ -204,6 +204,26 @@ pub struct FrozenEvent { pub frozen: bool, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct DepositEventV2 { + pub store: ResourceReference, + #[serde(deserialize_with = "deserialize_from_string")] + pub amount: BigDecimal, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct WithdrawEventV2 { + pub store: ResourceReference, + #[serde(deserialize_with = "deserialize_from_string")] + pub amount: BigDecimal, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct FrozenEventV2 { + pub store: ResourceReference, + pub frozen: bool, +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub enum V2FungibleAssetResource { FungibleAssetMetadata(FungibleAssetMetadata), @@ -256,6 +276,9 @@ pub enum FungibleAssetEvent { DepositEvent(DepositEvent), WithdrawEvent(WithdrawEvent), FrozenEvent(FrozenEvent), + DepositEventV2(DepositEventV2), + WithdrawEventV2(WithdrawEventV2), + FrozenEventV2(FrozenEventV2), } impl FungibleAssetEvent { @@ -270,6 +293,15 @@ impl FungibleAssetEvent { "0x1::fungible_asset::FrozenEvent" => { serde_json::from_str(data).map(|inner| Some(Self::FrozenEvent(inner))) }, + "0x1::fungible_asset::Deposit" => { + serde_json::from_str(data).map(|inner| Some(Self::DepositEventV2(inner))) + }, + "0x1::fungible_asset::Withdraw" => { + serde_json::from_str(data).map(|inner| Some(Self::WithdrawEventV2(inner))) + }, + "0x1::fungible_asset::Frozen" => { + serde_json::from_str(data).map(|inner| Some(Self::FrozenEventV2(inner))) + }, _ => Ok(None), } .context(format!( diff --git a/rust/processor/src/models/mod.rs b/rust/processor/src/models/mod.rs index cf80f3fc0..a914f99bf 100644 --- a/rust/processor/src/models/mod.rs +++ b/rust/processor/src/models/mod.rs @@ -1,6 +1,8 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use aptos_protos::transaction::v1::Event; + pub mod account_transaction_models; pub mod ans_models; pub mod coin_models; @@ -16,3 +18,10 @@ pub mod token_models; pub mod token_v2_models; pub mod transaction_metadata_model; pub mod user_transactions_models; + +pub(crate) fn should_skip(index: usize, event: &Event, events: &[Event]) -> bool { + let len = event.type_str.len(); + index > 0 + && event.type_str.ends_with("Event") + && events[index - 1].type_str[..len - 5] == event.type_str[..len - 5] +} diff --git a/rust/processor/src/models/stake_models/proposal_votes.rs b/rust/processor/src/models/stake_models/proposal_votes.rs index ebe473e0c..de9bfa4da 100644 --- a/rust/processor/src/models/stake_models/proposal_votes.rs +++ b/rust/processor/src/models/stake_models/proposal_votes.rs @@ -6,6 +6,7 @@ use super::stake_utils::StakeEvent; use crate::{ + models::should_skip, schema::proposal_votes, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, @@ -49,7 +50,10 @@ impl ProposalVote { let txn_version = transaction.version as i64; if let TxnData::User(user_txn) = txn_data { - for event in &user_txn.events { + for (index, event) in user_txn.events.iter().enumerate() { + if should_skip(index, event, &user_txn.events) { + continue; + }; if let Some(StakeEvent::GovernanceVoteEvent(ev)) = StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)? { diff --git a/rust/processor/src/models/stake_models/stake_utils.rs b/rust/processor/src/models/stake_models/stake_utils.rs index 40cf75fc2..3dcf06088 100644 --- a/rust/processor/src/models/stake_models/stake_utils.rs +++ b/rust/processor/src/models/stake_models/stake_utils.rs @@ -198,21 +198,24 @@ pub enum StakeEvent { impl StakeEvent { pub fn from_event(data_type: &str, data: &str, txn_version: i64) -> Result> { match data_type { - "0x1::aptos_governance::VoteEvent" => { + "0x1::aptos_governance::VoteEvent" | "0x1::aptos_governance::Vote" => { serde_json::from_str(data).map(|inner| Some(StakeEvent::GovernanceVoteEvent(inner))) }, - "0x1::stake::DistributeRewardsEvent" => serde_json::from_str(data) - .map(|inner| Some(StakeEvent::DistributeRewardsEvent(inner))), - "0x1::delegation_pool::AddStakeEvent" => { + "0x1::stake::DistributeRewardsEvent" | "0x1::stake::DistributeRewards" => { + serde_json::from_str(data) + .map(|inner| Some(StakeEvent::DistributeRewardsEvent(inner))) + }, + "0x1::delegation_pool::AddStakeEvent" | "0x1::delegation_pool::AddStake" => { serde_json::from_str(data).map(|inner| Some(StakeEvent::AddStakeEvent(inner))) }, - "0x1::delegation_pool::UnlockStakeEvent" => { + "0x1::delegation_pool::UnlockStakeEvent" | "0x1::delegation_pool::UnlockStake" => { serde_json::from_str(data).map(|inner| Some(StakeEvent::UnlockStakeEvent(inner))) }, - "0x1::delegation_pool::WithdrawStakeEvent" => { + "0x1::delegation_pool::WithdrawStakeEvent" | "0x1::delegation_pool::WithdrawStake" => { serde_json::from_str(data).map(|inner| Some(StakeEvent::WithdrawStakeEvent(inner))) }, - "0x1::delegation_pool::ReactivateStakeEvent" => serde_json::from_str(data) + "0x1::delegation_pool::ReactivateStakeEvent" + | "0x1::delegation_pool::ReactivateStake" => serde_json::from_str(data) .map(|inner| Some(StakeEvent::ReactivateStakeEvent(inner))), _ => Ok(None), } diff --git a/rust/processor/src/models/token_models/token_utils.rs b/rust/processor/src/models/token_models/token_utils.rs index 94efbc77d..76c8b1ac5 100644 --- a/rust/processor/src/models/token_models/token_utils.rs +++ b/rust/processor/src/models/token_models/token_utils.rs @@ -403,26 +403,29 @@ pub enum TokenEvent { impl TokenEvent { pub fn from_event(data_type: &str, data: &str, txn_version: i64) -> Result> { match data_type { - "0x3::token::MintTokenEvent" => { + "0x3::token::MintTokenEvent" | "0x3::token::MintToken" => { serde_json::from_str(data).map(|inner| Some(TokenEvent::MintTokenEvent(inner))) }, - "0x3::token::BurnTokenEvent" => { + "0x3::token::BurnTokenEvent" | "0x3::token::BurnToken" => { serde_json::from_str(data).map(|inner| Some(TokenEvent::BurnTokenEvent(inner))) }, - "0x3::token::MutateTokenPropertyMapEvent" => serde_json::from_str(data) - .map(|inner| Some(TokenEvent::MutateTokenPropertyMapEvent(inner))), - "0x3::token::WithdrawEvent" => { + "0x3::token::MutateTokenPropertyMapEvent" | "0x3::token::MutateTokenPropertyMap" => { + serde_json::from_str(data) + .map(|inner| Some(TokenEvent::MutateTokenPropertyMapEvent(inner))) + }, + "0x3::token::WithdrawEvent" | "0x3::token::Withdraw" => { serde_json::from_str(data).map(|inner| Some(TokenEvent::WithdrawTokenEvent(inner))) }, - "0x3::token::DepositEvent" => { + "0x3::token::DepositEvent" | "0x3::token::Deposit" => { serde_json::from_str(data).map(|inner| Some(TokenEvent::DepositTokenEvent(inner))) }, - "0x3::token_transfers::TokenOfferEvent" => { + "0x3::token_transfers::TokenOfferEvent" | "0x3::token_transfers::TokenOffer" => { serde_json::from_str(data).map(|inner| Some(TokenEvent::OfferTokenEvent(inner))) }, - "0x3::token_transfers::TokenCancelOfferEvent" => serde_json::from_str(data) + "0x3::token_transfers::TokenCancelOfferEvent" + | "0x3::token_transfers::TokenCancelOffer" => serde_json::from_str(data) .map(|inner| Some(TokenEvent::CancelTokenOfferEvent(inner))), - "0x3::token_transfers::TokenClaimEvent" => { + "0x3::token_transfers::TokenClaimEvent" | "0x3::token_transfers::TokenClaim" => { serde_json::from_str(data).map(|inner| Some(TokenEvent::ClaimTokenEvent(inner))) }, _ => Ok(None), diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index c5f303b17..79bbff76f 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -17,6 +17,8 @@ use crate::{ object_models::v2_object_utils::{ ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, }, + should_skip, + token_v2_models::v2_token_utils::TokenV2, }, schema, utils::{ @@ -433,6 +435,9 @@ async fn parse_v2_coin( // Loop to handle events and collect additional metadata from events for v2 for (index, event) in events.iter().enumerate() { + if should_skip(index, event, events) { + continue; + }; if let Some(v1_activity) = FungibleAssetActivity::get_v1_from_event( event, txn_version, From 15a7fc9385e682ff4f39ab9df129d89214942ccd Mon Sep 17 00:00:00 2001 From: Larry Liu Date: Wed, 15 May 2024 15:35:27 -0700 Subject: [PATCH 2/2] event v2 indexing. --- .../nft_orderbooks/parsers/okx_parser.py | 13 ++++- .../v2_fungible_asset_activities.rs | 12 ++--- .../v2_fungible_asset_utils.rs | 6 +-- rust/processor/src/models/mod.rs | 48 ++++++++++++++++++- .../stake_models/delegator_activities.rs | 4 ++ .../src/models/stake_models/proposal_votes.rs | 6 +-- .../models/token_v2_models/v2_token_utils.rs | 4 +- .../processors/fungible_asset_processor.rs | 1 - .../src/processors/token_v2_processor.rs | 4 ++ 9 files changed, 79 insertions(+), 19 deletions(-) diff --git a/python/processors/nft_orderbooks/parsers/okx_parser.py b/python/processors/nft_orderbooks/parsers/okx_parser.py index bb26d4553..7b01d7cb2 100644 --- a/python/processors/nft_orderbooks/parsers/okx_parser.py +++ b/python/processors/nft_orderbooks/parsers/okx_parser.py @@ -19,6 +19,8 @@ "okx_listing_utils::CancelListingEvent", ] ) +DEPOSITE_EVENT_V1 = "0x3::token::DepositEvent" +DEPOSITE_EVENT_V2 = "0x3::token::Deposit" def parse_marketplace_events( @@ -141,8 +143,15 @@ def parse_marketplace_events( def get_token_data_from_deposit_events(user_transaction) -> Dict[str, TokenDataIdType]: # Extract deposit events, which contain token metadata deposit_events: Dict[str, TokenDataIdType] = {} - for event in user_transaction.events: - if event.type_str != "0x3::token::DepositEvent": + for idx, event in enumerate(user_transaction.events): + if event.type_str != DEPOSITE_EVENT_V1 and event.type_str != DEPOSITE_EVENT_V2: + continue + # Current event is either DEPOSITE_EVENT_V1 or DEPOSITE_EVENT_V2. + if ( + idx > 0 + # skip if prior event is V2 deposit event. + and user_transaction.events[idx - 1].type_str == DEPOSITE_EVENT_V2 + ): continue account_address = standardize_address(event_utils.get_account_address(event)) data = json.loads(event.data) diff --git a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_activities.rs b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_activities.rs index 9d651d24a..633f6503f 100644 --- a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_activities.rs +++ b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_activities.rs @@ -89,20 +89,18 @@ impl FungibleAssetActivity { None, ), FungibleAssetEvent::WithdrawEventV2(inner) => ( - inner.store.get_reference_address(), + standardize_address(&inner.store), None, Some(inner.amount.clone()), ), FungibleAssetEvent::DepositEventV2(inner) => ( - inner.store.get_reference_address(), + standardize_address(&inner.store), None, Some(inner.amount.clone()), ), - FungibleAssetEvent::FrozenEventV2(inner) => ( - inner.store.get_reference_address(), - Some(inner.frozen), - None, - ), + FungibleAssetEvent::FrozenEventV2(inner) => { + (standardize_address(&inner.store), Some(inner.frozen), None) + }, }; // The event account address will also help us find fungible store which tells us where to find diff --git a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs index 547b528b4..cc00ac5b3 100644 --- a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs +++ b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs @@ -206,21 +206,21 @@ pub struct FrozenEvent { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct DepositEventV2 { - pub store: ResourceReference, + pub store: String, #[serde(deserialize_with = "deserialize_from_string")] pub amount: BigDecimal, } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct WithdrawEventV2 { - pub store: ResourceReference, + pub store: String, #[serde(deserialize_with = "deserialize_from_string")] pub amount: BigDecimal, } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct FrozenEventV2 { - pub store: ResourceReference, + pub store: String, pub frozen: bool, } diff --git a/rust/processor/src/models/mod.rs b/rust/processor/src/models/mod.rs index a914f99bf..ae5a63ff3 100644 --- a/rust/processor/src/models/mod.rs +++ b/rust/processor/src/models/mod.rs @@ -23,5 +23,51 @@ pub(crate) fn should_skip(index: usize, event: &Event, events: &[Event]) -> bool let len = event.type_str.len(); index > 0 && event.type_str.ends_with("Event") - && events[index - 1].type_str[..len - 5] == event.type_str[..len - 5] + && events[index - 1] + .type_str + .starts_with(&event.type_str[..len - 5]) +} + +#[cfg(test)] +mod tests { + use super::*; + // Tests are to make sure + // - only when previous processed event is event v2, current event v1 can be skipped + + #[test] + fn test_should_skip_intended() { + let event1 = Event { + type_str: "0x1::coin::Deposit<0x1::aptos_coin::AptosCoin>".to_string(), + ..Event::default() + }; + let event2 = Event { + type_str: "0x1::coin::DepositEvent".to_string(), + ..Event::default() + }; + let events = vec![event1, event2]; + assert!(!should_skip(0, &events[0], &events)); + assert!(should_skip(1, &events[1], &events)); + } + + #[test] + fn test_should_not_break_for_length() { + let events = [ + Event { + type_str: "Test0000000000000000000000000000000Event".to_string(), + ..Event::default() + }, + Event { + type_str: "TEvent".to_string(), + ..Event::default() + }, + Event { + type_str: "Test0000000000000000000000000000000Event".to_string(), + ..Event::default() + }, + ]; + assert!(!should_skip(0, &events[0], &events)); + // Note, it is intentional that the second event is skipped. + assert!(should_skip(1, &events[1], &events)); + assert!(!should_skip(2, &events[2], &events)); + } } diff --git a/rust/processor/src/models/stake_models/delegator_activities.rs b/rust/processor/src/models/stake_models/delegator_activities.rs index 846f10f7f..59c5bfd86 100644 --- a/rust/processor/src/models/stake_models/delegator_activities.rs +++ b/rust/processor/src/models/stake_models/delegator_activities.rs @@ -5,6 +5,7 @@ use super::stake_utils::StakeEvent; use crate::{ + models::should_skip, schema::delegated_staking_activities, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, @@ -57,6 +58,9 @@ impl DelegatedStakingActivity { if let Some(staking_event) = StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)? { + if should_skip(index, event, events) { + continue; + } let activity = match staking_event { StakeEvent::AddStakeEvent(inner) => DelegatedStakingActivity { transaction_version: txn_version, diff --git a/rust/processor/src/models/stake_models/proposal_votes.rs b/rust/processor/src/models/stake_models/proposal_votes.rs index de9bfa4da..5ed72f276 100644 --- a/rust/processor/src/models/stake_models/proposal_votes.rs +++ b/rust/processor/src/models/stake_models/proposal_votes.rs @@ -51,12 +51,12 @@ impl ProposalVote { if let TxnData::User(user_txn) = txn_data { for (index, event) in user_txn.events.iter().enumerate() { - if should_skip(index, event, &user_txn.events) { - continue; - }; if let Some(StakeEvent::GovernanceVoteEvent(ev)) = StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)? { + if should_skip(index, event, &user_txn.events) { + continue; + }; proposal_votes.push(Self { transaction_version: txn_version, proposal_id: ev.proposal_id as i64, diff --git a/rust/processor/src/models/token_v2_models/v2_token_utils.rs b/rust/processor/src/models/token_v2_models/v2_token_utils.rs index 2b6f25243..6e7cbdc39 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_utils.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_utils.rs @@ -584,7 +584,7 @@ impl V2TokenEvent { "0x4::collection::MintEvent" => { serde_json::from_str(data).map(|inner| Some(Self::MintEvent(inner))) }, - "0x4::token::MutationEvent" => { + "0x4::token::MutationEvent" | "0x4::token::Mutation" => { serde_json::from_str(data).map(|inner| Some(Self::TokenMutationEvent(inner))) }, "0x4::collection::Burn" => { @@ -593,7 +593,7 @@ impl V2TokenEvent { "0x4::collection::BurnEvent" => { serde_json::from_str(data).map(|inner| Some(Self::BurnEvent(inner))) }, - "0x1::object::TransferEvent" => { + "0x1::object::TransferEvent" | "0x1::object::Transfer" => { serde_json::from_str(data).map(|inner| Some(Self::TransferEvent(inner))) }, _ => Ok(None), diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index 79bbff76f..058362c68 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -18,7 +18,6 @@ use crate::{ ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, }, should_skip, - token_v2_models::v2_token_utils::TokenV2, }, schema, utils::{ diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index 005333e13..5dc4cc4db 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -8,6 +8,7 @@ use crate::{ object_models::v2_object_utils::{ ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, }, + should_skip, token_models::tokens::{TableHandleToOwner, TableMetadataForToken}, token_v2_models::{ v2_collections::{CollectionV2, CurrentCollectionV2, CurrentCollectionV2PK}, @@ -721,6 +722,9 @@ async fn parse_v2_token( // This needs to be here because we need the metadata above for token activities // and burn / transfer events need to come before the next section for (index, event) in user_txn.events.iter().enumerate() { + if should_skip(index, event, user_txn.events.as_slice()) { + continue; + } if let Some(burn_event) = Burn::from_event(event, txn_version).unwrap() { tokens_burned.insert(burn_event.get_token_address(), burn_event); }