Skip to content

Commit

Permalink
[event_v2] indexer logic update to handle migration (#360)
Browse files Browse the repository at this point in the history
* [event_v2] an example code for event v2 migration

* event v2 indexing.

---------

Co-authored-by: Larry Liu <[email protected]>
  • Loading branch information
lightmark and larry-aptos committed Nov 13, 2024
1 parent da43d48 commit 9313b13
Show file tree
Hide file tree
Showing 13 changed files with 412 additions and 37 deletions.
14 changes: 12 additions & 2 deletions python/processors/nft_orderbooks/parsers/okx_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
"okx_listing_utils::CancelListingEvent",
]
)
DEPOSIT_EVENT_V1 = "0x3::token::DepositEvent"
DEPOSIT_EVENT_V2 = "0x3::token::Deposit"


def parse_marketplace_events(
Expand Down Expand Up @@ -141,8 +143,16 @@ def parse_marketplace_events(
def get_token_data_from_deposit_events(user_transaction) -> Dict[str, TokenDataIdType]:
# Extract deposit events, which contain token metadata
deposit_events: Dict[str, TokenDataIdType] = {}
for event in user_transaction.events:
if event.type_str != "0x3::token::DepositEvent":
for idx, event in enumerate(user_transaction.events):
if event.type_str != DEPOSIT_EVENT_V1 and event.type_str != DEPOSIT_EVENT_V2:
continue
# Current event is either DEPOSIT_EVENT_V1 or DEPOSIT_EVENT_V2.
if (
idx > 0
# skip if prior event is V2 deposit event.
and user_transaction.events[idx - 1].type_str == DEPOSIT_EVENT_V2
and event.type_str == DEPOSIT_EVENT_V1
):
continue
account_address = standardize_address(event_utils.get_account_address(event))
data = json.loads(event.data)
Expand Down
2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ google-cloud-googleapis = "0.10.0"
google-cloud-pubsub = "0.18.0"
hex = "0.4.3"
itertools = "0.12.1"
lazy_static = "1.4.0"
jemallocator = { version = "0.5.0", features = [
"profiling",
"unprefixed_malloc_on_supported_platforms",
] }
json-structural-diff = "0.1.0"
assert-json-diff = "2.0.2"
kanal = { version = "0.1.0-pre8", features = ["async"] }
lazy_static = "1.4.0"
once_cell = "1.10.0"
num_cpus = "1.16.0"
pbjson = "0.5.1"
Expand Down
53 changes: 37 additions & 16 deletions rust/processor/src/db/common/models/coin_models/coin_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,23 +228,44 @@ impl CoinActivity {
transaction_timestamp: chrono::NaiveDateTime,
event_index: i64,
) -> Self {
let amount = match coin_event {
CoinEvent::WithdrawCoinEvent(inner) => inner.amount.clone(),
CoinEvent::DepositCoinEvent(inner) => inner.amount.clone(),
};
let event_move_guid = EventGuidResource {
addr: standardize_address(event.key.as_ref().unwrap().account_address.as_str()),
creation_num: event.key.as_ref().unwrap().creation_number as i64,
let (owner_address, amount, coin_type_option) = match coin_event {
CoinEvent::WithdrawCoinEvent(inner) => (
standardize_address(&event.key.as_ref().unwrap().account_address),
inner.amount.clone(),
None,
),
CoinEvent::DepositCoinEvent(inner) => (
standardize_address(&event.key.as_ref().unwrap().account_address),
inner.amount.clone(),
None,
),
CoinEvent::WithdrawCoinEventV2(inner) => (
standardize_address(&inner.account),
inner.amount.clone(),
Some(inner.coin_type.clone()),
),
CoinEvent::DepositCoinEventV2(inner) => (
standardize_address(&inner.account),
inner.amount.clone(),
Some(inner.coin_type.clone()),
),
};
let coin_type =
let coin_type = if let Some(coin_type) = coin_type_option {
coin_type
} else {
let event_move_guid = EventGuidResource {
addr: standardize_address(event.key.as_ref().unwrap().account_address.as_str()),
creation_num: event.key.as_ref().unwrap().creation_number as i64,
};
event_to_coin_type
.get(&event_move_guid)
.unwrap_or_else(|| {
panic!(
"Could not find event in resources (CoinStore), version: {}, event guid: {:?}, mapping: {:?}",
txn_version, event_move_guid, event_to_coin_type
)
}).clone();
.get(&event_move_guid)
.unwrap_or_else(|| {
panic!(
"Could not find event in resources (CoinStore), version: {}, event guid: {:?}, mapping: {:?}",
txn_version, event_move_guid, event_to_coin_type
)
}).clone()
};

Self {
transaction_version: txn_version,
Expand All @@ -253,7 +274,7 @@ impl CoinActivity {
),
event_creation_number: event.key.as_ref().unwrap().creation_number as i64,
event_sequence_number: event.sequence_number as i64,
owner_address: standardize_address(&event.key.as_ref().unwrap().account_address),
owner_address,
coin_type,
amount,
activity_type: event_type.to_string(),
Expand Down
24 changes: 24 additions & 0 deletions rust/processor/src/db/common/models/coin_models/coin_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,22 @@ pub struct DepositCoinEvent {
pub amount: BigDecimal,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WithdrawCoinEventV2 {
pub coin_type: String,
pub account: String,
#[serde(deserialize_with = "deserialize_from_string")]
pub amount: BigDecimal,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DepositCoinEventV2 {
pub coin_type: String,
pub account: String,
#[serde(deserialize_with = "deserialize_from_string")]
pub amount: BigDecimal,
}

pub struct CoinInfoType {
coin_type: String,
creator_address: String,
Expand Down Expand Up @@ -319,6 +335,8 @@ impl CoinResource {
pub enum CoinEvent {
WithdrawCoinEvent(WithdrawCoinEvent),
DepositCoinEvent(DepositCoinEvent),
WithdrawCoinEventV2(WithdrawCoinEventV2),
DepositCoinEventV2(DepositCoinEventV2),
}

impl CoinEvent {
Expand All @@ -330,6 +348,12 @@ impl CoinEvent {
"0x1::coin::DepositEvent" => {
serde_json::from_str(data).map(|inner| Some(CoinEvent::DepositCoinEvent(inner)))
},
"0x1::coin::CoinWithdraw" => {
serde_json::from_str(data).map(|inner| Some(CoinEvent::WithdrawCoinEventV2(inner)))
},
"0x1::coin::CoinDeposit" => {
serde_json::from_str(data).map(|inner| Some(CoinEvent::DepositCoinEventV2(inner)))
},
_ => Ok(None),
}
.context(format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,16 @@ impl FungibleAssetActivity {
inner.amount.to_string(),
None,
),
CoinEvent::WithdrawCoinEventV2(inner) => (
standardize_address(&inner.account),
inner.amount.to_string(),
Some(inner.coin_type.clone()),
),
CoinEvent::DepositCoinEventV2(inner) => (
standardize_address(&inner.account),
inner.amount.to_string(),
Some(inner.coin_type.clone()),
),
};
let coin_type = if let Some(coin_type) = coin_type_option {
coin_type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ impl FungibleAssetActivity {
inner.amount.clone(),
None,
),
CoinEvent::WithdrawCoinEventV2(inner) => (
standardize_address(&inner.account),
inner.amount.clone(),
Some(inner.coin_type.clone()),
),
CoinEvent::DepositCoinEventV2(inner) => (
standardize_address(&inner.account),
inner.amount.clone(),
Some(inner.coin_type.clone()),
),
};
let coin_type = if let Some(coin_type) = coin_type_option {
coin_type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl ProposalVote {
let txn_version = transaction.version as i64;

if let TxnData::User(user_txn) = txn_data {
for event in &user_txn.events {
for event in user_txn.events.iter() {
if let Some(StakeEvent::GovernanceVoteEvent(ev)) =
StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)?
{
Expand Down
17 changes: 10 additions & 7 deletions rust/processor/src/db/common/models/stake_models/stake_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,21 +200,24 @@ pub enum StakeEvent {
impl StakeEvent {
pub fn from_event(data_type: &str, data: &str, txn_version: i64) -> Result<Option<Self>> {
match data_type {
"0x1::aptos_governance::VoteEvent" => {
"0x1::aptos_governance::VoteEvent" | "0x1::aptos_governance::Vote" => {
serde_json::from_str(data).map(|inner| Some(StakeEvent::GovernanceVoteEvent(inner)))
},
"0x1::stake::DistributeRewardsEvent" => serde_json::from_str(data)
.map(|inner| Some(StakeEvent::DistributeRewardsEvent(inner))),
"0x1::delegation_pool::AddStakeEvent" => {
"0x1::stake::DistributeRewardsEvent" | "0x1::stake::DistributeRewards" => {
serde_json::from_str(data)
.map(|inner| Some(StakeEvent::DistributeRewardsEvent(inner)))
},
"0x1::delegation_pool::AddStakeEvent" | "0x1::delegation_pool::AddStake" => {
serde_json::from_str(data).map(|inner| Some(StakeEvent::AddStakeEvent(inner)))
},
"0x1::delegation_pool::UnlockStakeEvent" => {
"0x1::delegation_pool::UnlockStakeEvent" | "0x1::delegation_pool::UnlockStake" => {
serde_json::from_str(data).map(|inner| Some(StakeEvent::UnlockStakeEvent(inner)))
},
"0x1::delegation_pool::WithdrawStakeEvent" => {
"0x1::delegation_pool::WithdrawStakeEvent" | "0x1::delegation_pool::WithdrawStake" => {
serde_json::from_str(data).map(|inner| Some(StakeEvent::WithdrawStakeEvent(inner)))
},
"0x1::delegation_pool::ReactivateStakeEvent" => serde_json::from_str(data)
"0x1::delegation_pool::ReactivateStakeEvent"
| "0x1::delegation_pool::ReactivateStake" => serde_json::from_str(data)
.map(|inner| Some(StakeEvent::ReactivateStakeEvent(inner))),
_ => Ok(None),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ impl TokenActivity {
coin_type: None,
coin_amount: None,
},
TokenEvent::Mint(inner) => TokenActivityHelper {
token_data_id: &inner.id,
property_version: BigDecimal::zero(),
from_address: Some(inner.get_account()),
to_address: None,
token_amount: inner.amount.clone(),
coin_type: None,
coin_amount: None,
},
TokenEvent::BurnTokenEvent(inner) => TokenActivityHelper {
token_data_id: &inner.id.token_data_id,
property_version: inner.id.property_version.clone(),
Expand All @@ -129,6 +138,15 @@ impl TokenActivity {
coin_type: None,
coin_amount: None,
},
TokenEvent::Burn(inner) => TokenActivityHelper {
token_data_id: &inner.id.token_data_id,
property_version: inner.id.property_version.clone(),
from_address: Some(inner.get_account()),
to_address: None,
token_amount: inner.amount.clone(),
coin_type: None,
coin_amount: None,
},
TokenEvent::MutateTokenPropertyMapEvent(inner) => TokenActivityHelper {
token_data_id: &inner.new_id.token_data_id,
property_version: inner.new_id.property_version.clone(),
Expand All @@ -138,6 +156,15 @@ impl TokenActivity {
coin_type: None,
coin_amount: None,
},
TokenEvent::MutatePropertyMap(inner) => TokenActivityHelper {
token_data_id: &inner.new_id.token_data_id,
property_version: inner.new_id.property_version.clone(),
from_address: Some(inner.get_account()),
to_address: None,
token_amount: BigDecimal::zero(),
coin_type: None,
coin_amount: None,
},
TokenEvent::WithdrawTokenEvent(inner) => TokenActivityHelper {
token_data_id: &inner.id.token_data_id,
property_version: inner.id.property_version.clone(),
Expand All @@ -147,6 +174,15 @@ impl TokenActivity {
coin_type: None,
coin_amount: None,
},
TokenEvent::TokenWithdraw(inner) => TokenActivityHelper {
token_data_id: &inner.id.token_data_id,
property_version: inner.id.property_version.clone(),
from_address: Some(inner.get_account()),
to_address: None,
token_amount: inner.amount.clone(),
coin_type: None,
coin_amount: None,
},
TokenEvent::DepositTokenEvent(inner) => TokenActivityHelper {
token_data_id: &inner.id.token_data_id,
property_version: inner.id.property_version.clone(),
Expand All @@ -156,6 +192,15 @@ impl TokenActivity {
coin_type: None,
coin_amount: None,
},
TokenEvent::TokenDeposit(inner) => TokenActivityHelper {
token_data_id: &inner.id.token_data_id,
property_version: inner.id.property_version.clone(),
from_address: None,
to_address: Some(inner.get_account()),
token_amount: inner.amount.clone(),
coin_type: None,
coin_amount: None,
},
TokenEvent::OfferTokenEvent(inner) => TokenActivityHelper {
token_data_id: &inner.token_id.token_data_id,
property_version: inner.token_id.property_version.clone(),
Expand Down Expand Up @@ -183,6 +228,33 @@ impl TokenActivity {
coin_type: None,
coin_amount: None,
},
TokenEvent::Offer(inner) => TokenActivityHelper {
token_data_id: &inner.token_id.token_data_id,
property_version: inner.token_id.property_version.clone(),
from_address: Some(inner.get_from_address()),
to_address: Some(inner.get_to_address()),
token_amount: inner.amount.clone(),
coin_type: None,
coin_amount: None,
},
TokenEvent::CancelOffer(inner) => TokenActivityHelper {
token_data_id: &inner.token_id.token_data_id,
property_version: inner.token_id.property_version.clone(),
from_address: Some(inner.get_from_address()),
to_address: Some(inner.get_to_address()),
token_amount: inner.amount.clone(),
coin_type: None,
coin_amount: None,
},
TokenEvent::Claim(inner) => TokenActivityHelper {
token_data_id: &inner.token_id.token_data_id,
property_version: inner.token_id.property_version.clone(),
from_address: Some(inner.get_from_address()),
to_address: Some(inner.get_to_address()),
token_amount: inner.amount.clone(),
coin_type: None,
coin_amount: None,
},
};
let token_data_id = token_activity_helper.token_data_id;
Self {
Expand Down
Loading

0 comments on commit 9313b13

Please sign in to comment.