Skip to content

Commit

Permalink
[event_v2] indexer logic update to handle migration (#360)
Browse files Browse the repository at this point in the history
* [event_v2] an example code for event v2 migration

* event v2 indexing.

---------

Co-authored-by: Larry Liu <[email protected]>
  • Loading branch information
lightmark and larry-aptos authored May 16, 2024
1 parent 2eab97f commit 3551f40
Show file tree
Hide file tree
Showing 13 changed files with 285 additions and 69 deletions.
13 changes: 11 additions & 2 deletions python/processors/nft_orderbooks/parsers/okx_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
"okx_listing_utils::CancelListingEvent",
]
)
DEPOSITE_EVENT_V1 = "0x3::token::DepositEvent"
DEPOSITE_EVENT_V2 = "0x3::token::Deposit"


def parse_marketplace_events(
Expand Down Expand Up @@ -141,8 +143,15 @@ def parse_marketplace_events(
def get_token_data_from_deposit_events(user_transaction) -> Dict[str, TokenDataIdType]:
# Extract deposit events, which contain token metadata
deposit_events: Dict[str, TokenDataIdType] = {}
for event in user_transaction.events:
if event.type_str != "0x3::token::DepositEvent":
for idx, event in enumerate(user_transaction.events):
if event.type_str != DEPOSITE_EVENT_V1 and event.type_str != DEPOSITE_EVENT_V2:
continue
# Current event is either DEPOSITE_EVENT_V1 or DEPOSITE_EVENT_V2.
if (
idx > 0
# skip if prior event is V2 deposit event.
and user_transaction.events[idx - 1].type_str == DEPOSITE_EVENT_V2
):
continue
account_address = standardize_address(event_utils.get_account_address(event))
data = json.loads(event.data)
Expand Down
55 changes: 38 additions & 17 deletions rust/processor/src/models/coin_models/coin_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
},
v2_fungible_asset_utils::FeeStatement,
},
should_skip,
user_transactions_models::signatures::Signature,
},
processors::coin_processor::APTOS_COIN_TYPE_STR,
Expand Down Expand Up @@ -198,14 +199,18 @@ impl CoinActivity {
}
}
for (index, event) in events.iter().enumerate() {
if should_skip(index, event, events) {
continue;
}
let event_type = event.type_str.clone();
if let Some(parsed_event) =
if let Some((parsed_event, coin_type_option)) =
CoinEvent::from_event(event_type.as_str(), &event.data, txn_version).unwrap()
{
coin_activities.push(Self::from_parsed_event(
&event_type,
event,
&parsed_event,
coin_type_option,
txn_version,
&all_event_to_coin_type,
block_height,
Expand All @@ -228,30 +233,46 @@ impl CoinActivity {
event_type: &str,
event: &EventPB,
coin_event: &CoinEvent,
coin_type_option: Option<String>,
txn_version: i64,
event_to_coin_type: &EventToCoinType,
block_height: i64,
entry_function_id_str: &Option<String>,
transaction_timestamp: chrono::NaiveDateTime,
event_index: i64,
) -> Self {
let amount = match coin_event {
CoinEvent::WithdrawCoinEvent(inner) => inner.amount.clone(),
CoinEvent::DepositCoinEvent(inner) => inner.amount.clone(),
};
let event_move_guid = EventGuidResource {
addr: standardize_address(event.key.as_ref().unwrap().account_address.as_str()),
creation_num: event.key.as_ref().unwrap().creation_number as i64,
let (owner_address, amount) = match coin_event {
CoinEvent::WithdrawCoinEvent(inner) => (
standardize_address(&event.key.as_ref().unwrap().account_address),
inner.amount.clone(),
),
CoinEvent::DepositCoinEvent(inner) => (
standardize_address(&event.key.as_ref().unwrap().account_address),
inner.amount.clone(),
),
CoinEvent::WithdrawCoinEventV2(inner) => {
(standardize_address(&inner.account), inner.amount.clone())
},
CoinEvent::DepositCoinEventV2(inner) => {
(standardize_address(&inner.account), inner.amount.clone())
},
};
let coin_type =
let coin_type = if let Some(coin_type) = coin_type_option {
coin_type
} else {
let event_move_guid = EventGuidResource {
addr: standardize_address(event.key.as_ref().unwrap().account_address.as_str()),
creation_num: event.key.as_ref().unwrap().creation_number as i64,
};
event_to_coin_type
.get(&event_move_guid)
.unwrap_or_else(|| {
panic!(
"Could not find event in resources (CoinStore), version: {}, event guid: {:?}, mapping: {:?}",
txn_version, event_move_guid, event_to_coin_type
)
}).clone();
.get(&event_move_guid)
.unwrap_or_else(|| {
panic!(
"Could not find event in resources (CoinStore), version: {}, event guid: {:?}, mapping: {:?}",
txn_version, event_move_guid, event_to_coin_type
)
}).clone()
};

Self {
transaction_version: txn_version,
Expand All @@ -260,7 +281,7 @@ impl CoinActivity {
),
event_creation_number: event.key.as_ref().unwrap().creation_number as i64,
event_sequence_number: event.sequence_number as i64,
owner_address: standardize_address(&event.key.as_ref().unwrap().account_address),
owner_address,
coin_type,
amount,
activity_type: event_type.to_string(),
Expand Down
46 changes: 41 additions & 5 deletions rust/processor/src/models/coin_models/coin_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,20 @@ pub struct DepositCoinEvent {
pub amount: BigDecimal,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WithdrawCoinEventV2 {
pub account: String,
#[serde(deserialize_with = "deserialize_from_string")]
pub amount: BigDecimal,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DepositCoinEventV2 {
pub account: String,
#[serde(deserialize_with = "deserialize_from_string")]
pub amount: BigDecimal,
}

pub struct CoinInfoType {
coin_type: String,
creator_address: String,
Expand Down Expand Up @@ -288,16 +302,38 @@ impl CoinResource {
pub enum CoinEvent {
WithdrawCoinEvent(WithdrawCoinEvent),
DepositCoinEvent(DepositCoinEvent),
WithdrawCoinEventV2(WithdrawCoinEventV2),
DepositCoinEventV2(DepositCoinEventV2),
}

impl CoinEvent {
pub fn from_event(data_type: &str, data: &str, txn_version: i64) -> Result<Option<CoinEvent>> {
pub fn from_event(
data_type: &str,
data: &str,
txn_version: i64,
) -> Result<Option<(CoinEvent, Option<String>)>> {
match data_type {
"0x1::coin::WithdrawEvent" => {
serde_json::from_str(data).map(|inner| Some(CoinEvent::WithdrawCoinEvent(inner)))
"0x1::coin::WithdrawEvent" => serde_json::from_str(data)
.map(|inner| Some((CoinEvent::WithdrawCoinEvent(inner), None))),
"0x1::coin::DepositEvent" => serde_json::from_str(data)
.map(|inner| Some((CoinEvent::WithdrawCoinEvent(inner), None))),
t if t.starts_with("0x1::coin::Withdraw") => {
let inner_type_start = t.find('<').unwrap();
serde_json::from_str(data).map(|inner| {
Some((
CoinEvent::WithdrawCoinEventV2(inner),
Some(t[inner_type_start + 1..t.len() - 1].into()),
))
})
},
"0x1::coin::DepositEvent" => {
serde_json::from_str(data).map(|inner| Some(CoinEvent::DepositCoinEvent(inner)))
t if t.starts_with("0x1::coin::Deposit") => {
let inner_type_start = t.find('<').unwrap();
serde_json::from_str(data).map(|inner| {
Some((
CoinEvent::WithdrawCoinEventV2(inner),
Some(t[inner_type_start + 1..t.len() - 1].into()),
))
})
},
_ => Ok(None),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,36 @@ impl FungibleAssetActivity {
if let Some(fa_event) =
&FungibleAssetEvent::from_event(event_type.as_str(), &event.data, txn_version)?
{
let storage_id = standardize_address(&event.key.as_ref().unwrap().account_address);
let (storage_id, is_frozen, amount) = match fa_event {
FungibleAssetEvent::WithdrawEvent(inner) => (
standardize_address(&event.key.as_ref().unwrap().account_address),
None,
Some(inner.amount.clone()),
),
FungibleAssetEvent::DepositEvent(inner) => (
standardize_address(&event.key.as_ref().unwrap().account_address),
None,
Some(inner.amount.clone()),
),
FungibleAssetEvent::FrozenEvent(inner) => (
standardize_address(&event.key.as_ref().unwrap().account_address),
Some(inner.frozen),
None,
),
FungibleAssetEvent::WithdrawEventV2(inner) => (
standardize_address(&inner.store),
None,
Some(inner.amount.clone()),
),
FungibleAssetEvent::DepositEventV2(inner) => (
standardize_address(&inner.store),
None,
Some(inner.amount.clone()),
),
FungibleAssetEvent::FrozenEventV2(inner) => {
(standardize_address(&inner.store), Some(inner.frozen), None)
},
};

// The event account address will also help us find fungible store which tells us where to find
// the metadata
Expand All @@ -81,12 +110,6 @@ impl FungibleAssetActivity {
let fungible_asset = object_metadata.fungible_asset_store.as_ref().unwrap();
let asset_type = fungible_asset.metadata.get_reference_address();

let (is_frozen, amount) = match fa_event {
FungibleAssetEvent::WithdrawEvent(inner) => (None, Some(inner.amount.clone())),
FungibleAssetEvent::DepositEvent(inner) => (None, Some(inner.amount.clone())),
FungibleAssetEvent::FrozenEvent(inner) => (Some(inner.frozen), None),
};

return Ok(Some(Self {
transaction_version: txn_version,
event_index,
Expand Down Expand Up @@ -119,36 +142,54 @@ impl FungibleAssetActivity {
event_to_coin_type: &EventToCoinType,
event_index: i64,
) -> anyhow::Result<Option<Self>> {
if let Some(inner) =
if let Some((inner, coin_type_option)) =
CoinEvent::from_event(event.type_str.as_str(), &event.data, txn_version)?
{
let amount = match inner {
CoinEvent::WithdrawCoinEvent(inner) => inner.amount,
CoinEvent::DepositCoinEvent(inner) => inner.amount,
};
let event_key = event.key.as_ref().context("event must have a key")?;
let event_move_guid = EventGuidResource {
addr: standardize_address(event_key.account_address.as_str()),
creation_num: event_key.creation_number as i64,
let (owner_address, amount) = match inner {
CoinEvent::WithdrawCoinEvent(inner) => (
standardize_address(&event.key.as_ref().unwrap().account_address),
inner.amount.clone(),
),
CoinEvent::DepositCoinEvent(inner) => (
standardize_address(&event.key.as_ref().unwrap().account_address),
inner.amount.clone(),
),
CoinEvent::WithdrawCoinEventV2(inner) => {
(standardize_address(&inner.account), inner.amount.clone())
},
CoinEvent::DepositCoinEventV2(inner) => {
(standardize_address(&inner.account), inner.amount.clone())
},
};
// Given this mapping only contains coin type < 1000 length, we should not assume that the mapping exists.
// If it doesn't exist, skip.
let coin_type = match event_to_coin_type.get(&event_move_guid) {
Some(coin_type) => coin_type.clone(),
None => {
tracing::warn!(
let coin_type = if let Some(coin_type) = coin_type_option {
coin_type
} else {
let event_key = event.key.as_ref().context("event must have a key")?;
let event_move_guid = EventGuidResource {
addr: standardize_address(event_key.account_address.as_str()),
creation_num: event_key.creation_number as i64,
};
// Given this mapping only contains coin type < 1000 length, we should not assume that the mapping exists.
// If it doesn't exist, skip.
match event_to_coin_type.get(&event_move_guid) {
Some(coin_type) => coin_type.clone(),
None => {
tracing::warn!(
"Could not find event in resources (CoinStore), version: {}, event guid: {:?}, mapping: {:?}",
txn_version, event_move_guid, event_to_coin_type
);
return Ok(None);
},
return Ok(None);
},
}
};

let storage_id =
CoinInfoType::get_storage_id(coin_type.as_str(), event_move_guid.addr.as_str());
CoinInfoType::get_storage_id(coin_type.as_str(), owner_address.as_str());

Ok(Some(Self {
transaction_version: txn_version,
event_index,
owner_address: event_move_guid.addr,
owner_address,
storage_id,
asset_type: coin_type,
is_frozen: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,26 @@ pub struct FrozenEvent {
pub frozen: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DepositEventV2 {
pub store: String,
#[serde(deserialize_with = "deserialize_from_string")]
pub amount: BigDecimal,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WithdrawEventV2 {
pub store: String,
#[serde(deserialize_with = "deserialize_from_string")]
pub amount: BigDecimal,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FrozenEventV2 {
pub store: String,
pub frozen: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum V2FungibleAssetResource {
FungibleAssetMetadata(FungibleAssetMetadata),
Expand Down Expand Up @@ -256,6 +276,9 @@ pub enum FungibleAssetEvent {
DepositEvent(DepositEvent),
WithdrawEvent(WithdrawEvent),
FrozenEvent(FrozenEvent),
DepositEventV2(DepositEventV2),
WithdrawEventV2(WithdrawEventV2),
FrozenEventV2(FrozenEventV2),
}

impl FungibleAssetEvent {
Expand All @@ -270,6 +293,15 @@ impl FungibleAssetEvent {
"0x1::fungible_asset::FrozenEvent" => {
serde_json::from_str(data).map(|inner| Some(Self::FrozenEvent(inner)))
},
"0x1::fungible_asset::Deposit" => {
serde_json::from_str(data).map(|inner| Some(Self::DepositEventV2(inner)))
},
"0x1::fungible_asset::Withdraw" => {
serde_json::from_str(data).map(|inner| Some(Self::WithdrawEventV2(inner)))
},
"0x1::fungible_asset::Frozen" => {
serde_json::from_str(data).map(|inner| Some(Self::FrozenEventV2(inner)))
},
_ => Ok(None),
}
.context(format!(
Expand Down
Loading

0 comments on commit 3551f40

Please sign in to comment.