diff --git a/python/processors/nft_orderbooks/parsers/okx_parser.py b/python/processors/nft_orderbooks/parsers/okx_parser.py
index bb26d4553..5f9c3154b 100644
--- a/python/processors/nft_orderbooks/parsers/okx_parser.py
+++ b/python/processors/nft_orderbooks/parsers/okx_parser.py
@@ -19,6 +19,8 @@
"okx_listing_utils::CancelListingEvent",
]
)
+DEPOSIT_EVENT_V1 = "0x3::token::DepositEvent"
+DEPOSIT_EVENT_V2 = "0x3::token::Deposit"
def parse_marketplace_events(
@@ -141,8 +143,16 @@ def parse_marketplace_events(
def get_token_data_from_deposit_events(user_transaction) -> Dict[str, TokenDataIdType]:
# Extract deposit events, which contain token metadata
deposit_events: Dict[str, TokenDataIdType] = {}
- for event in user_transaction.events:
- if event.type_str != "0x3::token::DepositEvent":
+ for idx, event in enumerate(user_transaction.events):
+ if event.type_str != DEPOSIT_EVENT_V1 and event.type_str != DEPOSIT_EVENT_V2:
+ continue
+ # Current event is either DEPOSIT_EVENT_V1 or DEPOSIT_EVENT_V2.
+ if (
+ idx > 0
+ # skip if prior event is V2 deposit event.
+ and user_transaction.events[idx - 1].type_str == DEPOSIT_EVENT_V2
+ and event.type_str == DEPOSIT_EVENT_V1
+ ):
continue
account_address = standardize_address(event_utils.get_account_address(event))
data = json.loads(event.data)
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 12e7584cf..6e80091bc 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -61,12 +61,12 @@ google-cloud-googleapis = "0.10.0"
google-cloud-pubsub = "0.18.0"
hex = "0.4.3"
itertools = "0.12.1"
-lazy_static = "1.4.0"
jemallocator = { version = "0.5.0", features = [
"profiling",
"unprefixed_malloc_on_supported_platforms",
] }
kanal = { version = "0.1.0-pre8", features = ["async"] }
+lazy_static = "1.4.0"
once_cell = "1.10.0"
num_cpus = "1.16.0"
pbjson = "0.5.1"
diff --git a/rust/processor/src/db/common/models/coin_models/coin_activities.rs b/rust/processor/src/db/common/models/coin_models/coin_activities.rs
index e7467761f..87d562f3a 100644
--- a/rust/processor/src/db/common/models/coin_models/coin_activities.rs
+++ b/rust/processor/src/db/common/models/coin_models/coin_activities.rs
@@ -19,6 +19,7 @@ use crate::{
},
v2_fungible_asset_utils::FeeStatement,
},
+ should_skip,
user_transactions_models::signatures::Signature,
},
schema::coin_activities,
@@ -186,6 +187,16 @@ impl CoinActivity {
}
}
for (index, event) in events.iter().enumerate() {
+ if should_skip(
+ event,
+ if index == 0 {
+ None
+ } else {
+ Some(&events[index - 1])
+ },
+ ) {
+ continue;
+ }
let event_type = event.type_str.clone();
if let Some(parsed_event) =
CoinEvent::from_event(event_type.as_str(), &event.data, txn_version).unwrap()
@@ -222,23 +233,44 @@ impl CoinActivity {
transaction_timestamp: chrono::NaiveDateTime,
event_index: i64,
) -> Self {
- let amount = match coin_event {
- CoinEvent::WithdrawCoinEvent(inner) => inner.amount.clone(),
- CoinEvent::DepositCoinEvent(inner) => inner.amount.clone(),
- };
- let event_move_guid = EventGuidResource {
- addr: standardize_address(event.key.as_ref().unwrap().account_address.as_str()),
- creation_num: event.key.as_ref().unwrap().creation_number as i64,
+ let (owner_address, amount, coin_type_option) = match coin_event {
+ CoinEvent::WithdrawCoinEvent(inner) => (
+ standardize_address(&event.key.as_ref().unwrap().account_address),
+ inner.amount.clone(),
+ None,
+ ),
+ CoinEvent::DepositCoinEvent(inner) => (
+ standardize_address(&event.key.as_ref().unwrap().account_address),
+ inner.amount.clone(),
+ None,
+ ),
+ CoinEvent::WithdrawCoinEventV2(inner) => (
+ standardize_address(&inner.account),
+ inner.amount.clone(),
+ Some(inner.coin_type.clone()),
+ ),
+ CoinEvent::DepositCoinEventV2(inner) => (
+ standardize_address(&inner.account),
+ inner.amount.clone(),
+ Some(inner.coin_type.clone()),
+ ),
};
- let coin_type =
+ let coin_type = if let Some(coin_type) = coin_type_option {
+ coin_type
+ } else {
+ let event_move_guid = EventGuidResource {
+ addr: standardize_address(event.key.as_ref().unwrap().account_address.as_str()),
+ creation_num: event.key.as_ref().unwrap().creation_number as i64,
+ };
event_to_coin_type
- .get(&event_move_guid)
- .unwrap_or_else(|| {
- panic!(
- "Could not find event in resources (CoinStore), version: {}, event guid: {:?}, mapping: {:?}",
- txn_version, event_move_guid, event_to_coin_type
- )
- }).clone();
+ .get(&event_move_guid)
+ .unwrap_or_else(|| {
+ panic!(
+ "Could not find event in resources (CoinStore), version: {}, event guid: {:?}, mapping: {:?}",
+ txn_version, event_move_guid, event_to_coin_type
+ )
+ }).clone()
+ };
Self {
transaction_version: txn_version,
@@ -247,7 +279,7 @@ impl CoinActivity {
),
event_creation_number: event.key.as_ref().unwrap().creation_number as i64,
event_sequence_number: event.sequence_number as i64,
- owner_address: standardize_address(&event.key.as_ref().unwrap().account_address),
+ owner_address,
coin_type,
amount,
activity_type: event_type.to_string(),
diff --git a/rust/processor/src/db/common/models/coin_models/coin_utils.rs b/rust/processor/src/db/common/models/coin_models/coin_utils.rs
index ec46f532c..2da54e341 100644
--- a/rust/processor/src/db/common/models/coin_models/coin_utils.rs
+++ b/rust/processor/src/db/common/models/coin_models/coin_utils.rs
@@ -151,6 +151,22 @@ pub struct DepositCoinEvent {
pub amount: BigDecimal,
}
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct WithdrawCoinEventV2 {
+ pub coin_type: String,
+ pub account: String,
+ #[serde(deserialize_with = "deserialize_from_string")]
+ pub amount: BigDecimal,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct DepositCoinEventV2 {
+ pub coin_type: String,
+ pub account: String,
+ #[serde(deserialize_with = "deserialize_from_string")]
+ pub amount: BigDecimal,
+}
+
pub struct CoinInfoType {
coin_type: String,
creator_address: String,
@@ -313,6 +329,8 @@ impl CoinResource {
pub enum CoinEvent {
WithdrawCoinEvent(WithdrawCoinEvent),
DepositCoinEvent(DepositCoinEvent),
+ WithdrawCoinEventV2(WithdrawCoinEventV2),
+ DepositCoinEventV2(DepositCoinEventV2),
}
impl CoinEvent {
@@ -324,6 +342,12 @@ impl CoinEvent {
"0x1::coin::DepositEvent" => {
serde_json::from_str(data).map(|inner| Some(CoinEvent::DepositCoinEvent(inner)))
},
+ "0x1::coin::CoinWithdraw" => {
+ serde_json::from_str(data).map(|inner| Some(CoinEvent::WithdrawCoinEventV2(inner)))
+ },
+ "0x1::coin::CoinDeposit" => {
+ serde_json::from_str(data).map(|inner| Some(CoinEvent::DepositCoinEventV2(inner)))
+ },
_ => Ok(None),
}
.context(format!(
diff --git a/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_activities.rs b/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_activities.rs
index 9dd5afbc8..d4f1e52fe 100644
--- a/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_activities.rs
+++ b/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_activities.rs
@@ -160,6 +160,16 @@ impl FungibleAssetActivity {
inner.amount.clone(),
None,
),
+ CoinEvent::WithdrawCoinEventV2(inner) => (
+ standardize_address(&inner.account),
+ inner.amount.clone(),
+ Some(inner.coin_type.clone()),
+ ),
+ CoinEvent::DepositCoinEventV2(inner) => (
+ standardize_address(&inner.account),
+ inner.amount.clone(),
+ Some(inner.coin_type.clone()),
+ ),
};
let coin_type = if let Some(coin_type) = coin_type_option {
coin_type
diff --git a/rust/processor/src/db/common/models/mod.rs b/rust/processor/src/db/common/models/mod.rs
index cf80f3fc0..e26c2ee70 100644
--- a/rust/processor/src/db/common/models/mod.rs
+++ b/rust/processor/src/db/common/models/mod.rs
@@ -1,6 +1,10 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0
+use ahash::AHashMap;
+use aptos_protos::transaction::v1::Event;
+use lazy_static::lazy_static;
+
pub mod account_transaction_models;
pub mod ans_models;
pub mod coin_models;
@@ -16,3 +20,92 @@ pub mod token_models;
pub mod token_v2_models;
pub mod transaction_metadata_model;
pub mod user_transactions_models;
+
+lazy_static! {
+ pub static ref V1_TO_V2_MAPPING: AHashMap<&'static str, &'static str> = {
+ vec![
+ (
+ "0x1::aptos_governance::VoteEvent",
+ "0x1::aptos_governance::Vote",
+ ),
+ ("0x1::coin::CoinDepositEvent", "0x1::coin::CoinDeposit"),
+ ("0x1::coin::CoinWithdrawEvent", "0x1::coin::CoinWithdraw"),
+ (
+ "0x1::delegation_pool::AddStakeEvent",
+ "0x1::delegation_pool::AddStake",
+ ),
+ (
+ "0x1::delegation_pool::UnlockStakeEvent",
+ "0x1::delegation_pool::UnlockStake",
+ ),
+ (
+ "0x1::delegation_pool::WithdrawStakeEvent",
+ "0x1::delegation_pool::WithdrawStake",
+ ),
+ (
+ "0x1::delegation_pool::ReactivateStakeEvent",
+ "0x1::delegation_pool::ReactivateStake",
+ ),
+ ("0x1::object::TransferEvent", "0x1::object::Transfer"),
+ (
+ "0x1::stake::DistributeRewardsEvent",
+ "0x1::stake::DistributeRewards",
+ ),
+ ("0x3::token::MintTokenEvent", "0x3::token::MintToken"),
+ ("0x3::token::BurnTokenEvent", "0x3::token::BurnToken"),
+ (
+ "0x3::token::MutateTokenPropertyMapEvent",
+ "0x3::token::MutateTokenPropertyMap",
+ ),
+ ("0x3::token::WithdrawEvent", "0x3::token::Withdraw"),
+ ("0x3::token::DepositEvent", "0x3::token::Deposit"),
+ (
+ "0x3::token_transfers::TokenOfferEvent",
+ "0x3::token_transfers::TokenOffer",
+ ),
+ (
+ "0x3::token_transfers::TokenCancelOfferEvent",
+ "0x3::token_transfers::TokenCancelOffer",
+ ),
+ (
+ "0x3::token_transfers::TokenClaimEvent",
+ "0x3::token_transfers::TokenClaim",
+ ),
+ ("0x4::token::MutationEvent", "0x4::token::Mutation"),
+ ]
+ .into_iter()
+ .collect()
+ };
+}
+
+pub(crate) fn should_skip(event: &Event, prev_event: Option<&Event>) -> bool {
+ if let (Some(prev_event_type), Some(prev_event)) =
+ (V1_TO_V2_MAPPING.get(&event.type_str.as_str()), prev_event)
+ {
+ *prev_event_type == prev_event.type_str
+ } else {
+ false
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ // Tests are to make sure
+ // - only when previous processed event is event v2, current event v1 can be skipped
+ #[test]
+ fn test_should_not_break_for_length() {
+ let events = [
+ Event {
+ type_str: "0x4::token::Mutation".to_string(),
+ ..Event::default()
+ },
+ Event {
+ type_str: "0x4::token::MutationEvent".to_string(),
+ ..Event::default()
+ },
+ ];
+ assert!(!should_skip(&events[0], None));
+ assert!(should_skip(&events[1], Some(&events[0])));
+ }
+}
diff --git a/rust/processor/src/db/common/models/stake_models/delegator_activities.rs b/rust/processor/src/db/common/models/stake_models/delegator_activities.rs
index e087480bb..f32f8ffb6 100644
--- a/rust/processor/src/db/common/models/stake_models/delegator_activities.rs
+++ b/rust/processor/src/db/common/models/stake_models/delegator_activities.rs
@@ -5,6 +5,7 @@
use super::stake_utils::StakeEvent;
use crate::{
+ db::common::models::should_skip,
schema::delegated_staking_activities,
utils::{
counters::PROCESSOR_UNKNOWN_TYPE_COUNT,
@@ -58,6 +59,16 @@ impl DelegatedStakingActivity {
if let Some(staking_event) =
StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)?
{
+ if should_skip(
+ event,
+ if index == 0 {
+ None
+ } else {
+ Some(&events[index - 1])
+ },
+ ) {
+ continue;
+ }
let activity = match staking_event {
StakeEvent::AddStakeEvent(inner) => DelegatedStakingActivity {
transaction_version: txn_version,
diff --git a/rust/processor/src/db/common/models/stake_models/proposal_votes.rs b/rust/processor/src/db/common/models/stake_models/proposal_votes.rs
index ebe473e0c..0868d571c 100644
--- a/rust/processor/src/db/common/models/stake_models/proposal_votes.rs
+++ b/rust/processor/src/db/common/models/stake_models/proposal_votes.rs
@@ -6,6 +6,7 @@
use super::stake_utils::StakeEvent;
use crate::{
+ db::common::models::should_skip,
schema::proposal_votes,
utils::{
counters::PROCESSOR_UNKNOWN_TYPE_COUNT,
@@ -49,10 +50,20 @@ impl ProposalVote {
let txn_version = transaction.version as i64;
if let TxnData::User(user_txn) = txn_data {
- for event in &user_txn.events {
+ for (index, event) in user_txn.events.iter().enumerate() {
if let Some(StakeEvent::GovernanceVoteEvent(ev)) =
StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)?
{
+ if should_skip(
+ event,
+ if index == 0 {
+ None
+ } else {
+ Some(&user_txn.events[index - 1])
+ },
+ ) {
+ continue;
+ };
proposal_votes.push(Self {
transaction_version: txn_version,
proposal_id: ev.proposal_id as i64,
diff --git a/rust/processor/src/db/common/models/stake_models/stake_utils.rs b/rust/processor/src/db/common/models/stake_models/stake_utils.rs
index f623d3ffb..eed532907 100644
--- a/rust/processor/src/db/common/models/stake_models/stake_utils.rs
+++ b/rust/processor/src/db/common/models/stake_models/stake_utils.rs
@@ -200,21 +200,24 @@ pub enum StakeEvent {
impl StakeEvent {
pub fn from_event(data_type: &str, data: &str, txn_version: i64) -> Result