From ee05799e7c9021e4b00a3187a3b8e222e3f173f1 Mon Sep 17 00:00:00 2001 From: Yuun Lim <38443641+yuunlimm@users.noreply.github.com> Date: Mon, 29 Jul 2024 14:30:11 -0700 Subject: [PATCH] add missing indexing delete reousrces (#475) --- .../parquet_v2_token_ownerships.rs | 322 +++++++++++++++--- .../parquet_token_v2_processor.rs | 126 ++++++- 2 files changed, 399 insertions(+), 49 deletions(-) diff --git a/rust/processor/src/db/common/models/token_v2_models/parquet_v2_token_ownerships.rs b/rust/processor/src/db/common/models/token_v2_models/parquet_v2_token_ownerships.rs index 855b324a3..267d332c0 100644 --- a/rust/processor/src/db/common/models/token_v2_models/parquet_v2_token_ownerships.rs +++ b/rust/processor/src/db/common/models/token_v2_models/parquet_v2_token_ownerships.rs @@ -9,18 +9,22 @@ use crate::{ bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, db::common::models::{ fungible_asset_models::parquet_v2_fungible_asset_balances::DEFAULT_AMOUNT_VALUE, - object_models::v2_object_utils::ObjectAggregatedDataMapping, + object_models::v2_object_utils::{ObjectAggregatedDataMapping, ObjectWithMetadata}, token_models::{token_utils::TokenWriteSet, tokens::TableHandleToOwner}, token_v2_models::{ - parquet_v2_token_datas::TokenDataV2, v2_token_ownerships::CurrentTokenOwnershipV2, - v2_token_utils::TokenStandard, + parquet_v2_token_datas::TokenDataV2, + v2_token_ownerships::{CurrentTokenOwnershipV2, NFTOwnershipV2}, + v2_token_utils::{TokenStandard, TokenV2Burned, DEFAULT_OWNER_ADDRESS}, }, }, utils::util::{ensure_not_negative, standardize_address}, }; +use ahash::AHashMap; use allocative_derive::Allocative; use anyhow::Context; -use aptos_protos::transaction::v1::{DeleteTableItem, WriteTableItem}; +use aptos_protos::transaction::v1::{ + DeleteResource, DeleteTableItem, WriteResource, WriteTableItem, +}; use bigdecimal::{BigDecimal, ToPrimitive, Zero}; use field_count::FieldCount; use parquet_derive::ParquetRecordWriter; @@ -72,7 +76,6 @@ impl TokenOwnershipV2 { object_metadatas: &ObjectAggregatedDataMapping, ) -> anyhow::Result> { let mut ownerships = vec![]; - // let mut current_ownerships = AHashMap::new(); let object_data = object_metadatas .get(&token_data.token_data_id) @@ -136,6 +139,68 @@ impl TokenOwnershipV2 { Ok(ownerships) } + async fn get_burned_nft_v2_helper( + token_address: &str, + txn_version: i64, + write_set_change_index: i64, + txn_timestamp: chrono::NaiveDateTime, + prior_nft_ownership: &AHashMap, + tokens_burned: &TokenV2Burned, + ) -> anyhow::Result> { + let token_address = standardize_address(token_address); + if let Some(burn_event) = tokens_burned.get(&token_address) { + // 1. Try to lookup token address in burn event mapping + let previous_owner = + if let Some(previous_owner) = burn_event.get_previous_owner_address() { + previous_owner + } else { + // 2. If it doesn't exist in burn event mapping, then it must be an old burn event that doesn't contain previous_owner. + // Do a lookup to get previous owner. This is necessary because previous owner is part of current token ownerships primary key. + match prior_nft_ownership.get(&token_address) { + Some(inner) => inner.owner_address.clone(), + None => DEFAULT_OWNER_ADDRESS.to_string(), + } + }; + + let token_data_id = token_address.clone(); + let storage_id = token_data_id.clone(); + + return Ok(Some(( + Self { + txn_version, + write_set_change_index, + token_data_id: token_data_id.clone(), + property_version_v1: LEGACY_DEFAULT_PROPERTY_VERSION, + owner_address: Some(previous_owner.clone()), + storage_id: storage_id.clone(), + amount: DEFAULT_AMOUNT_VALUE.clone(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: None, // default + token_standard: TokenStandard::V2.to_string(), + block_timestamp: txn_timestamp, + non_transferrable_by_owner: None, // default + }, + CurrentTokenOwnershipV2 { + token_data_id, + property_version_v1: BigDecimal::zero(), + owner_address: previous_owner, + storage_id, + amount: BigDecimal::zero(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: None, // default + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: None, // default + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + non_transferrable_by_owner: None, // default + }, + ))); + } + Ok(None) + } + /// We want to track tokens in any offer/claims and tokenstore pub fn get_v1_from_delete_table_item( table_item: &DeleteTableItem, @@ -143,7 +208,7 @@ impl TokenOwnershipV2 { write_set_change_index: i64, txn_timestamp: chrono::NaiveDateTime, table_handle_to_owner: &TableHandleToOwner, - ) -> anyhow::Result> { + ) -> anyhow::Result)>> { let table_item_data = table_item.data.as_ref().unwrap(); let maybe_token_id = match TokenWriteSet::from_table_item_type( @@ -161,7 +226,7 @@ impl TokenOwnershipV2 { let token_data_id = token_data_id_struct.to_id(); let maybe_table_metadata = table_handle_to_owner.get(&table_handle); - let (_, owner_address, table_type) = match maybe_table_metadata { + let (curr_token_ownership, owner_address, table_type) = match maybe_table_metadata { Some(tm) => { if tm.table_type != "0x3::token::TokenStore" { return Ok(None); @@ -190,21 +255,24 @@ impl TokenOwnershipV2 { None => (None, None, None), }; - Ok(Some(Self { - txn_version, - write_set_change_index, - token_data_id, - property_version_v1: token_id_struct.property_version.to_u64().unwrap(), - owner_address, - storage_id: table_handle, - amount: DEFAULT_AMOUNT_VALUE.clone(), - table_type_v1: table_type, - token_properties_mutated_v1: None, - is_soulbound_v2: None, - token_standard: TokenStandard::V1.to_string(), - block_timestamp: txn_timestamp, - non_transferrable_by_owner: None, - })) + Ok(Some(( + Self { + txn_version, + write_set_change_index, + token_data_id, + property_version_v1: token_id_struct.property_version.to_u64().unwrap(), + owner_address, + storage_id: table_handle, + amount: DEFAULT_AMOUNT_VALUE.clone(), + table_type_v1: table_type, + token_properties_mutated_v1: None, + is_soulbound_v2: None, + token_standard: TokenStandard::V1.to_string(), + block_timestamp: txn_timestamp, + non_transferrable_by_owner: None, + }, + curr_token_ownership, + ))) } else { Ok(None) } @@ -217,7 +285,7 @@ impl TokenOwnershipV2 { write_set_change_index: i64, txn_timestamp: chrono::NaiveDateTime, table_handle_to_owner: &TableHandleToOwner, - ) -> anyhow::Result> { + ) -> anyhow::Result)>> { let table_item_data = table_item.data.as_ref().unwrap(); let maybe_token = match TokenWriteSet::from_table_item_type( @@ -237,36 +305,204 @@ impl TokenOwnershipV2 { let token_data_id = token_data_id_struct.to_id(); let maybe_table_metadata = table_handle_to_owner.get(&table_handle); - let (owner_address, table_type) = match maybe_table_metadata { + let (curr_token_ownership, owner_address, table_type) = match maybe_table_metadata { Some(tm) => { if tm.table_type != "0x3::token::TokenStore" { return Ok(None); } let owner_address = tm.get_owner_address(); - (Some(owner_address), Some(tm.table_type.clone())) + ( + Some(CurrentTokenOwnershipV2 { + token_data_id: token_data_id.clone(), + property_version_v1: token_id_struct.property_version.clone(), + owner_address: owner_address.clone(), + storage_id: table_handle.clone(), + amount: amount.clone(), + table_type_v1: Some(tm.table_type.clone()), + token_properties_mutated_v1: Some(token.token_properties.clone()), + is_soulbound_v2: None, + token_standard: TokenStandard::V1.to_string(), + is_fungible_v2: None, + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + non_transferrable_by_owner: None, + }), + Some(owner_address), + Some(tm.table_type.clone()), + ) }, - None => (None, None), + None => (None, None, None), }; - Ok(Some(Self { - txn_version, - write_set_change_index, - token_data_id, - property_version_v1: token_id_struct.property_version.to_u64().unwrap(), - owner_address, - storage_id: table_handle, - amount: amount.to_string(), - table_type_v1: table_type, - token_properties_mutated_v1: Some( - canonical_json::to_string(&token.token_properties).unwrap(), - ), - is_soulbound_v2: None, - token_standard: TokenStandard::V1.to_string(), - block_timestamp: txn_timestamp, - non_transferrable_by_owner: None, - })) + Ok(Some(( + Self { + txn_version, + write_set_change_index, + token_data_id, + property_version_v1: token_id_struct.property_version.to_u64().unwrap(), + owner_address, + storage_id: table_handle, + amount: amount.to_string(), + table_type_v1: table_type, + token_properties_mutated_v1: Some( + canonical_json::to_string(&token.token_properties).unwrap(), + ), + is_soulbound_v2: None, + token_standard: TokenStandard::V1.to_string(), + block_timestamp: txn_timestamp, + non_transferrable_by_owner: None, + }, + curr_token_ownership, + ))) } else { Ok(None) } } + + pub async fn get_burned_nft_v2_from_write_resource( + write_resource: &WriteResource, + txn_version: i64, + write_set_change_index: i64, + txn_timestamp: chrono::NaiveDateTime, + prior_nft_ownership: &AHashMap, + tokens_burned: &TokenV2Burned, + object_metadatas: &ObjectAggregatedDataMapping, + ) -> anyhow::Result> { + let token_data_id = standardize_address(&write_resource.address.to_string()); + if tokens_burned + .get(&standardize_address(&token_data_id)) + .is_some() + { + if let Some(object) = + &ObjectWithMetadata::from_write_resource(write_resource, txn_version)? + { + let object_core = &object.object_core; + let owner_address = object_core.get_owner_address(); + let storage_id = token_data_id.clone(); + + // is_soulbound currently means if an object is completely untransferrable + // OR if only admin can transfer. Only the former is true soulbound but + // people might already be using it with the latter meaning so let's include both. + let is_soulbound = if object_metadatas + .get(&token_data_id) + .map(|obj| obj.untransferable.as_ref()) + .is_some() + { + true + } else { + !object_core.allow_ungated_transfer + }; + let non_transferrable_by_owner = !object_core.allow_ungated_transfer; + + return Ok(Some(( + Self { + txn_version, + write_set_change_index, + token_data_id: token_data_id.clone(), + property_version_v1: LEGACY_DEFAULT_PROPERTY_VERSION, + owner_address: Some(owner_address.clone()), + storage_id: storage_id.clone(), + amount: DEFAULT_AMOUNT_VALUE.clone(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: Some(is_soulbound), + token_standard: TokenStandard::V2.to_string(), + block_timestamp: txn_timestamp, + non_transferrable_by_owner: Some(non_transferrable_by_owner), + }, + CurrentTokenOwnershipV2 { + token_data_id, + property_version_v1: BigDecimal::zero(), + owner_address, + storage_id, + amount: BigDecimal::zero(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: Some(is_soulbound), + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: Some(false), + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + non_transferrable_by_owner: Some(non_transferrable_by_owner), + }, + ))); + } else { + return Self::get_burned_nft_v2_helper( + &token_data_id, + txn_version, + write_set_change_index, + txn_timestamp, + prior_nft_ownership, + tokens_burned, + ) + .await; + } + } + Ok(None) + } + + pub fn get_burned_nft_v2_from_delete_resource( + delete_resource: &DeleteResource, + txn_version: i64, + write_set_change_index: i64, + txn_timestamp: chrono::NaiveDateTime, + prior_nft_ownership: &AHashMap, + tokens_burned: &TokenV2Burned, + ) -> anyhow::Result> { + let token_address = standardize_address(&delete_resource.address.to_string()); + let token_address = standardize_address(&token_address); + if let Some(burn_event) = tokens_burned.get(&token_address) { + // 1. Try to lookup token address in burn event mapping + let previous_owner = + if let Some(previous_owner) = burn_event.get_previous_owner_address() { + previous_owner + } else { + // 2. If it doesn't exist in burn event mapping, then it must be an old burn event that doesn't contain previous_owner. + // Do a lookup to get previous owner. This is necessary because previous owner is part of current token ownerships primary key. + match prior_nft_ownership.get(&token_address) { + Some(inner) => inner.owner_address.clone(), + None => { + DEFAULT_OWNER_ADDRESS.to_string() // we don't want to query db to get the previous owner for parquet. + }, + } + }; + + let token_data_id = token_address.clone(); + let storage_id = token_data_id.clone(); + + return Ok(Some(( + Self { + txn_version, + write_set_change_index, + token_data_id: token_data_id.clone(), + property_version_v1: LEGACY_DEFAULT_PROPERTY_VERSION, + owner_address: Some(previous_owner.clone()), + storage_id: storage_id.clone(), + amount: DEFAULT_AMOUNT_VALUE.clone(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: None, // default + token_standard: TokenStandard::V2.to_string(), + block_timestamp: txn_timestamp, + non_transferrable_by_owner: None, // default + }, + CurrentTokenOwnershipV2 { + token_data_id, + property_version_v1: BigDecimal::zero(), + owner_address: previous_owner, + storage_id, + amount: BigDecimal::zero(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: None, // default + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: None, // default + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + non_transferrable_by_owner: None, // default + }, + ))); + } + Ok(None) + } } diff --git a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs index 723254b3c..9ce0bedd2 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs @@ -15,9 +15,11 @@ use crate::{ token_v2_models::{ parquet_v2_token_datas::TokenDataV2, parquet_v2_token_ownerships::TokenOwnershipV2, + v2_token_ownerships::NFTOwnershipV2, v2_token_utils::{ - AptosCollection, ConcurrentSupply, FixedSupply, PropertyMapModel, TokenIdentifiers, - TokenV2, TransferEvent, UnlimitedSupply, + AptosCollection, Burn, BurnEvent, ConcurrentSupply, FixedSupply, MintEvent, + PropertyMapModel, TokenIdentifiers, TokenV2, TokenV2Burned, TokenV2Minted, + TransferEvent, UnlimitedSupply, }, }, }, @@ -29,7 +31,7 @@ use crate::{ util::{parse_timestamp, standardize_address}, }, }; -use ahash::AHashMap; +use ahash::{AHashMap, AHashSet}; use anyhow::Context; use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Change, Transaction}; use async_trait::async_trait; @@ -176,6 +178,8 @@ async fn parse_v2_token( let mut token_datas_v2 = vec![]; let mut token_ownerships_v2 = vec![]; + // Tracks prior ownership in case a token gets burned + let mut prior_nft_ownership: AHashMap = AHashMap::new(); // Get Metadata for token v2 by object // We want to persist this through the entire batch so that even if a token is burned, // we can still get the object core metadata for it @@ -202,6 +206,10 @@ async fn parse_v2_token( let transaction_info = txn.info.as_ref().expect("Transaction info doesn't exist!"); if let TxnData::User(user_txn) = txn_data { + // Get burn events for token v2 by object + let mut tokens_burned: TokenV2Burned = AHashMap::new(); + // Get mint events for token v2 by object + let mut tokens_minted: TokenV2Minted = AHashSet::new(); // Need to do a first pass to get all the objects for wsc in transaction_info.changes.iter() { if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { @@ -276,6 +284,20 @@ async fn parse_v2_token( // This needs to be here because we need the metadata above for token activities // and burn / transfer events need to come before the next section for (index, event) in user_txn.events.iter().enumerate() { + if let Some(burn_event) = Burn::from_event(event, txn_version).unwrap() { + tokens_burned.insert(burn_event.get_token_address(), burn_event); + } + if let Some(old_burn_event) = BurnEvent::from_event(event, txn_version).unwrap() { + let burn_event = Burn::new( + standardize_address(event.key.as_ref().unwrap().account_address.as_str()), + old_burn_event.get_token_address(), + "".to_string(), + ); + tokens_burned.insert(burn_event.get_token_address(), burn_event); + } + if let Some(mint_event) = MintEvent::from_event(event, txn_version).unwrap() { + tokens_minted.insert(mint_event.get_token_address()); + } if let Some(transfer_events) = TransferEvent::from_event(event, txn_version).unwrap() { @@ -314,7 +336,7 @@ async fn parse_v2_token( .and_modify(|e| *e += 1) .or_insert(1); } - if let Some(token_ownership) = + if let Some((token_ownership, current_token_ownership)) = TokenOwnershipV2::get_v1_from_write_table_item( table_item, txn_version, @@ -325,6 +347,16 @@ async fn parse_v2_token( .unwrap() { token_ownerships_v2.push(token_ownership); + if let Some(cto) = current_token_ownership { + prior_nft_ownership.insert( + cto.token_data_id.clone(), + NFTOwnershipV2 { + token_data_id: cto.token_data_id.clone(), + owner_address: cto.owner_address.clone(), + is_soulbound: cto.is_soulbound_v2, + }, + ); + } transaction_version_to_struct_count .entry(txn_version) .and_modify(|e| *e += 1) @@ -332,7 +364,7 @@ async fn parse_v2_token( } }, Change::DeleteTableItem(table_item) => { - if let Some(token_ownership) = + if let Some((token_ownership, current_token_ownership)) = TokenOwnershipV2::get_v1_from_delete_table_item( table_item, txn_version, @@ -347,6 +379,16 @@ async fn parse_v2_token( .entry(txn_version) .and_modify(|e| *e += 1) .or_insert(1); + if let Some(cto) = current_token_ownership { + prior_nft_ownership.insert( + cto.token_data_id.clone(), + NFTOwnershipV2 { + token_data_id: cto.token_data_id.clone(), + owner_address: cto.owner_address.clone(), + is_soulbound: cto.is_soulbound_v2, + }, + ); + } } }, Change::WriteResource(resource) => { @@ -365,12 +407,84 @@ async fn parse_v2_token( &token_v2_metadata_helper, ) .unwrap(); + if let Some(current_nft_ownership) = ownerships.first() { + // Note that the first element in ownerships is the current ownership. We need to cache + // it in prior_nft_ownership so that moving forward if we see a burn we'll know + // where it came from. + prior_nft_ownership.insert( + current_nft_ownership.token_data_id.clone(), + NFTOwnershipV2 { + token_data_id: current_nft_ownership.token_data_id.clone(), + owner_address: current_nft_ownership + .owner_address + .as_ref() + .unwrap() + .clone(), + is_soulbound: current_nft_ownership.is_soulbound_v2, + }, + ); + } + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += ownerships.len() as i64 + 1) + .or_insert(1); token_ownerships_v2.append(&mut ownerships); token_datas_v2.push(token_data); + } + // Add burned NFT handling + if let Some((nft_ownership, current_nft_ownership)) = + TokenOwnershipV2::get_burned_nft_v2_from_write_resource( + resource, + txn_version, + wsc_index, + txn_timestamp, + &prior_nft_ownership, + &tokens_burned, + &token_v2_metadata_helper, + ) + .await + .unwrap() + { + token_ownerships_v2.push(nft_ownership); transaction_version_to_struct_count .entry(txn_version) - .and_modify(|e| *e += ownerships.len() as i64 + 1) + .and_modify(|e| *e += 1) + .or_insert(1); + prior_nft_ownership.insert( + current_nft_ownership.token_data_id.clone(), + NFTOwnershipV2 { + token_data_id: current_nft_ownership.token_data_id.clone(), + owner_address: current_nft_ownership.owner_address.clone(), + is_soulbound: current_nft_ownership.is_soulbound_v2, + }, + ); + } + }, + Change::DeleteResource(resource) => { + if let Some((nft_ownership, current_nft_ownership)) = + TokenOwnershipV2::get_burned_nft_v2_from_delete_resource( + resource, + txn_version, + wsc_index, + txn_timestamp, + &prior_nft_ownership, + &tokens_burned, + ) + .unwrap() + { + token_ownerships_v2.push(nft_ownership); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += 1) .or_insert(1); + prior_nft_ownership.insert( + current_nft_ownership.token_data_id.clone(), + NFTOwnershipV2 { + token_data_id: current_nft_ownership.token_data_id.clone(), + owner_address: current_nft_ownership.owner_address.clone(), + is_soulbound: current_nft_ownership.is_soulbound_v2, + }, + ); } }, _ => {},