From 4a7a9754c90639e227f58b03e939c3c2b61d2642 Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Tue, 10 Dec 2024 15:38:03 -0800 Subject: [PATCH] [parquet-sdk][token_v2] migrate token ownership table pt1 ] --- .../models/ans_models/raw_ans_lookup_v2.rs | 2 +- .../ans_models/raw_ans_primary_name_v2.rs | 3 +- .../raw_v2_fungible_asset_activities.rs | 4 +- .../raw_v2_fungible_asset_balances.rs | 3 +- .../raw_v2_fungible_metadata.rs | 4 +- .../db/common/models/token_v2_models/mod.rs | 2 + .../raw_v2_token_activities.rs | 3 +- .../token_v2_models/raw_v2_token_datas.rs | 5 +- .../raw_v2_token_ownerships.rs | 655 +++++++++ .../models/token_v2_models/v2_token_utils.rs | 433 ++++++ .../token_v2_models/v2_token_ownerships.rs | 114 +- .../ans_models/parquet_ans_lookup_v2.rs | 4 +- .../v2_fungible_asset_utils.rs | 5 +- .../models/object_models/v2_object_utils.rs | 7 +- .../src/db/postgres/models/resources.rs | 7 +- .../db/postgres/models/token_v2_models/mod.rs | 1 - .../models/token_v2_models/v2_collections.rs | 4 +- .../token_v2_models/v2_token_ownerships.rs | 1174 +++++++++-------- .../models/token_v2_models/v2_token_utils.rs | 866 ++++++------ .../parquet_token_v2_processor.rs | 22 +- .../src/processors/token_v2_processor.rs | 77 +- .../token_v2_processor/token_v2_extractor.rs | 32 +- 22 files changed, 2358 insertions(+), 1069 deletions(-) create mode 100644 rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs create mode 100644 rust/processor/src/db/common/models/token_v2_models/v2_token_utils.rs diff --git a/rust/processor/src/db/common/models/ans_models/raw_ans_lookup_v2.rs b/rust/processor/src/db/common/models/ans_models/raw_ans_lookup_v2.rs index 694b0eb0e..da6af0597 100644 --- a/rust/processor/src/db/common/models/ans_models/raw_ans_lookup_v2.rs +++ b/rust/processor/src/db/common/models/ans_models/raw_ans_lookup_v2.rs @@ -11,13 +11,13 @@ use crate::{ ans_lookup::{AnsLookup, CurrentAnsLookup}, ans_utils::{get_token_name, NameRecordV2, SubdomainExtV2}, }, - token_v2_models::v2_token_utils::TokenStandard, }, utils::util::standardize_address, }; use ahash::AHashMap; use aptos_protos::transaction::v1::WriteResource; use serde::{Deserialize, Serialize}; +use crate::db::common::models::token_v2_models::v2_token_utils::TokenStandard; type Domain = String; type Subdomain = String; diff --git a/rust/processor/src/db/common/models/ans_models/raw_ans_primary_name_v2.rs b/rust/processor/src/db/common/models/ans_models/raw_ans_primary_name_v2.rs index d59db4a2e..7d0c6dada 100644 --- a/rust/processor/src/db/common/models/ans_models/raw_ans_primary_name_v2.rs +++ b/rust/processor/src/db/common/models/ans_models/raw_ans_primary_name_v2.rs @@ -10,11 +10,10 @@ use crate::db::postgres::models::{ ans_lookup::{AnsPrimaryName, CurrentAnsPrimaryName}, ans_utils::SetReverseLookupEvent, }, - token_v2_models::v2_token_utils::TokenStandard, }; use aptos_protos::transaction::v1::Event; use serde::{Deserialize, Serialize}; - +use crate::db::common::models::token_v2_models::v2_token_utils::TokenStandard; type RegisteredAddress = String; // PK of current_ans_primary_nameTokenStandard type CurrentAnsPrimaryNameV2PK = (RegisteredAddress, TokenStandardType); diff --git a/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_activities.rs b/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_activities.rs index 4ee39930c..07626f120 100644 --- a/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_activities.rs +++ b/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_activities.rs @@ -13,7 +13,7 @@ use crate::{ }, fungible_asset_models::v2_fungible_asset_utils::{FeeStatement, FungibleAssetEvent}, object_models::v2_object_utils::ObjectAggregatedDataMapping, - token_v2_models::v2_token_utils::TokenStandard, + }, utils::util::standardize_address, }; @@ -22,7 +22,7 @@ use anyhow::Context; use aptos_protos::transaction::v1::{Event, TransactionInfo, UserTransactionRequest}; use bigdecimal::{BigDecimal, Zero}; use serde::{Deserialize, Serialize}; - +use crate::db::common::models::token_v2_models::v2_token_utils::TokenStandard; pub const GAS_FEE_EVENT: &str = "0x1::aptos_coin::GasFeeEvent"; // We will never have a negative number on chain so this will avoid collision in postgres pub const BURN_GAS_EVENT_CREATION_NUM: i64 = -1; diff --git a/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_balances.rs b/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_balances.rs index 0356699c1..1c85e1df3 100644 --- a/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_balances.rs +++ b/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_asset_balances.rs @@ -13,7 +13,6 @@ use crate::{ fungible_asset_models::v2_fungible_asset_utils::FungibleAssetStore, object_models::v2_object_utils::ObjectAggregatedDataMapping, resources::FromWriteResource, - token_v2_models::v2_token_utils::{TokenStandard, V2_STANDARD}, }, }, utils::util::{ @@ -27,6 +26,8 @@ use bigdecimal::{BigDecimal, Zero}; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use std::borrow::Borrow; +use crate::db::common::models::token_v2_models::v2_token_utils::{V2_STANDARD, TokenStandard}; + // Storage id pub type CurrentFungibleAssetBalancePK = String; diff --git a/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_metadata.rs b/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_metadata.rs index 26baf5bf8..5e42ef910 100644 --- a/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_metadata.rs +++ b/rust/processor/src/db/common/models/fungible_asset_models/raw_v2_fungible_metadata.rs @@ -11,7 +11,7 @@ use crate::{ fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata, object_models::v2_object_utils::ObjectAggregatedDataMapping, resources::FromWriteResource, - token_v2_models::v2_token_utils::TokenStandard, + }, utils::util::standardize_address, }; @@ -19,7 +19,7 @@ use ahash::AHashMap; use aptos_protos::transaction::v1::{DeleteResource, WriteResource}; use bigdecimal::BigDecimal; use serde::{Deserialize, Serialize}; - +use crate::db::common::models::token_v2_models::v2_token_utils::TokenStandard; // This is the asset type pub type FungibleAssetMetadataPK = String; pub type FungibleAssetMetadataMapping = diff --git a/rust/processor/src/db/common/models/token_v2_models/mod.rs b/rust/processor/src/db/common/models/token_v2_models/mod.rs index b8ae30ccb..f00227296 100644 --- a/rust/processor/src/db/common/models/token_v2_models/mod.rs +++ b/rust/processor/src/db/common/models/token_v2_models/mod.rs @@ -3,3 +3,5 @@ pub mod raw_v1_token_royalty; pub mod raw_v2_token_activities; pub mod raw_v2_token_datas; pub mod raw_v2_token_metadata; +pub mod raw_v2_token_ownerships; +pub mod v2_token_utils; diff --git a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_activities.rs b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_activities.rs index 867e580cd..99dfb314c 100644 --- a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_activities.rs +++ b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_activities.rs @@ -11,7 +11,6 @@ use crate::{ postgres::models::{ object_models::v2_object_utils::ObjectAggregatedDataMapping, token_models::token_utils::{TokenDataIdType, TokenEvent}, - token_v2_models::v2_token_utils::{TokenStandard, V2TokenEvent}, }, }, utils::util::standardize_address, @@ -19,6 +18,8 @@ use crate::{ use aptos_protos::transaction::v1::Event; use bigdecimal::{BigDecimal, One, Zero}; use serde::{Deserialize, Serialize}; +use crate::db::common::models::token_v2_models::v2_token_utils::V2TokenEvent; +use crate::db::common::models::token_v2_models::v2_token_utils::TokenStandard; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct RawTokenActivityV2 { diff --git a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_datas.rs b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_datas.rs index 8b3c85018..433a9b543 100644 --- a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_datas.rs +++ b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_datas.rs @@ -10,13 +10,16 @@ use crate::{ object_models::v2_object_utils::ObjectAggregatedDataMapping, resources::FromWriteResource, token_models::token_utils::TokenWriteSet, - token_v2_models::v2_token_utils::{TokenStandard, TokenV2, TokenV2Burned}, }, utils::util::standardize_address, }; use aptos_protos::transaction::v1::{DeleteResource, WriteResource, WriteTableItem}; use bigdecimal::BigDecimal; use serde::{Deserialize, Serialize}; +use crate::db::common::models::token_v2_models::v2_token_utils::TokenStandard; +use crate::db::common::models::token_v2_models::v2_token_utils::TokenV2; +use crate::db::common::models::token_v2_models::v2_token_utils::TokenV2Burned; + #[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct RawTokenDataV2 { diff --git a/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs new file mode 100644 index 000000000..c76261a13 --- /dev/null +++ b/rust/processor/src/db/common/models/token_v2_models/raw_v2_token_ownerships.rs @@ -0,0 +1,655 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + db::postgres::models::{ + object_models::v2_object_utils::{ObjectAggregatedDataMapping, ObjectWithMetadata}, + resources::FromWriteResource, + token_models::{token_utils::TokenWriteSet, tokens::TableHandleToOwner}, + }, + schema::{current_token_ownerships_v2}, + utils::{ + database::DbPoolConnection, + util::{ensure_not_negative, standardize_address}, + }, +}; +use ahash::AHashMap; +use anyhow::Context; +use aptos_protos::transaction::v1::{ + DeleteResource, DeleteTableItem, WriteResource, WriteTableItem, +}; +use bigdecimal::{BigDecimal, One, Zero}; +use diesel::prelude::*; +use diesel_async::RunQueryDsl; +use field_count::FieldCount; +use serde::{Deserialize, Serialize}; +use crate::db::common::models::token_v2_models::v2_token_utils::TokenStandard; +use crate::db::common::models::token_v2_models::raw_v2_token_datas::RawTokenDataV2; +use crate::db::common::models::token_v2_models::v2_token_utils::DEFAULT_OWNER_ADDRESS; +use crate::db::common::models::token_v2_models::v2_token_utils::TokenV2Burned; + +// PK of current_token_ownerships_v2, i.e. token_data_id, property_version_v1, owner_address, storage_id +pub type CurrentTokenOwnershipV2PK = (String, BigDecimal, String, String); + +#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)] +pub struct RawTokenOwnershipV2 { + pub transaction_version: i64, + pub write_set_change_index: i64, + pub token_data_id: String, + pub property_version_v1: BigDecimal, + pub owner_address: Option, + pub storage_id: String, + pub amount: BigDecimal, + pub table_type_v1: Option, + pub token_properties_mutated_v1: Option, + pub is_soulbound_v2: Option, + pub token_standard: String, + pub is_fungible_v2: Option, + pub transaction_timestamp: chrono::NaiveDateTime, + pub non_transferrable_by_owner: Option, +} + +pub trait TokenOwnershipV2Convertible { + fn from_raw(raw_item: RawTokenOwnershipV2) -> Self; +} + +#[derive( + Clone, Debug, Deserialize, Eq, PartialEq, Serialize, +)] +pub struct RawCurrentTokenOwnershipV2 { + pub token_data_id: String, + pub property_version_v1: BigDecimal, + pub owner_address: String, + pub storage_id: String, + pub amount: BigDecimal, + pub table_type_v1: Option, + pub token_properties_mutated_v1: Option, + pub is_soulbound_v2: Option, + pub token_standard: String, + pub is_fungible_v2: Option, + pub last_transaction_version: i64, + pub last_transaction_timestamp: chrono::NaiveDateTime, + pub non_transferrable_by_owner: Option, +} + +impl Ord for RawCurrentTokenOwnershipV2 { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.token_data_id + .cmp(&other.token_data_id) + .then(self.property_version_v1.cmp(&other.property_version_v1)) + .then(self.owner_address.cmp(&other.owner_address)) + .then(self.storage_id.cmp(&other.storage_id)) + } +} + +impl PartialOrd for RawCurrentTokenOwnershipV2 { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + + +pub trait CurrentTokenOwnershipV2Convertible{ + fn from_raw(raw_item: RawCurrentTokenOwnershipV2) -> Self; +} + +// Facilitate tracking when a token is burned +#[derive(Clone, Debug)] +pub struct NFTOwnershipV2 { + pub token_data_id: String, + pub owner_address: String, + pub is_soulbound: Option, +} + +/// Need a separate struct for queryable because we don't want to define the inserted_at column (letting DB fill) +#[derive(Clone, Debug, Queryable)] +pub struct CurrentTokenOwnershipV2Query { + pub token_data_id: String, + pub property_version_v1: BigDecimal, + pub owner_address: String, + pub storage_id: String, + pub amount: BigDecimal, + pub table_type_v1: Option, + pub token_properties_mutated_v1: Option, + pub is_soulbound_v2: Option, + pub token_standard: String, + pub is_fungible_v2: Option, + pub last_transaction_version: i64, + pub last_transaction_timestamp: chrono::NaiveDateTime, + pub inserted_at: chrono::NaiveDateTime, + pub non_transferrable_by_owner: Option, +} + + +impl RawTokenOwnershipV2 { + /// For nfts it's the same resources that we parse tokendatas from so we leverage the work done in there to get ownership data + /// Vecs are returned because there could be multiple transfers in a single transaction and we need to document each one here. + pub fn get_nft_v2_from_token_data( + token_data: &RawTokenDataV2, + object_metadatas: &ObjectAggregatedDataMapping, + ) -> anyhow::Result<( + Vec, + AHashMap, + )> { + let mut ownerships = vec![]; + let mut current_ownerships = AHashMap::new(); + + let object_data = object_metadatas + .get(&token_data.token_data_id) + .context("If token data exists objectcore must exist")?; + let object_core = object_data.object.object_core.clone(); + let token_data_id = token_data.token_data_id.clone(); + let owner_address = object_core.get_owner_address(); + let storage_id = token_data_id.clone(); + + // is_soulbound currently means if an object is completely untransferrable + // OR if only admin can transfer. Only the former is true soulbound but + // people might already be using it with the latter meaning so let's include both. + let is_soulbound = if object_data.untransferable.as_ref().is_some() { + true + } else { + !object_core.allow_ungated_transfer + }; + let non_transferrable_by_owner = !object_core.allow_ungated_transfer; + + ownerships.push(Self { + transaction_version: token_data.transaction_version, + write_set_change_index: token_data.write_set_change_index, + token_data_id: token_data_id.clone(), + property_version_v1: BigDecimal::zero(), + owner_address: Some(owner_address.clone()), + storage_id: storage_id.clone(), + amount: BigDecimal::one(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: Some(is_soulbound), + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: None, + transaction_timestamp: token_data.transaction_timestamp, + non_transferrable_by_owner: Some(non_transferrable_by_owner), + }); + current_ownerships.insert( + ( + token_data_id.clone(), + BigDecimal::zero(), + owner_address.clone(), + storage_id.clone(), + ), + RawCurrentTokenOwnershipV2 { + token_data_id: token_data_id.clone(), + property_version_v1: BigDecimal::zero(), + owner_address, + storage_id: storage_id.clone(), + amount: BigDecimal::one(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: Some(is_soulbound), + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: None, + last_transaction_version: token_data.transaction_version, + last_transaction_timestamp: token_data.transaction_timestamp, + non_transferrable_by_owner: Some(non_transferrable_by_owner), + }, + ); + + // check if token was transferred + for (event_index, transfer_event) in &object_data.transfer_events { + // If it's a self transfer then skip + if transfer_event.get_to_address() == transfer_event.get_from_address() { + continue; + } + ownerships.push(Self { + transaction_version: token_data.transaction_version, + // set to negative of event index to avoid collison with write set index + write_set_change_index: -1 * event_index, + token_data_id: token_data_id.clone(), + property_version_v1: BigDecimal::zero(), + // previous owner + owner_address: Some(transfer_event.get_from_address()), + storage_id: storage_id.clone(), + // soft delete + amount: BigDecimal::zero(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: Some(is_soulbound), + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: None, + transaction_timestamp: token_data.transaction_timestamp, + non_transferrable_by_owner: Some(is_soulbound), + }); + current_ownerships.insert( + ( + token_data_id.clone(), + BigDecimal::zero(), + transfer_event.get_from_address(), + storage_id.clone(), + ), + RawCurrentTokenOwnershipV2 { + token_data_id: token_data_id.clone(), + property_version_v1: BigDecimal::zero(), + // previous owner + owner_address: transfer_event.get_from_address(), + storage_id: storage_id.clone(), + // soft delete + amount: BigDecimal::zero(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: Some(is_soulbound), + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: None, + last_transaction_version: token_data.transaction_version, + last_transaction_timestamp: token_data.transaction_timestamp, + non_transferrable_by_owner: Some(is_soulbound), + }, + ); + } + Ok((ownerships, current_ownerships)) + } + + /// This handles the case where token is burned but objectCore is still there + pub async fn get_burned_nft_v2_from_write_resource( + write_resource: &WriteResource, + txn_version: i64, + write_set_change_index: i64, + txn_timestamp: chrono::NaiveDateTime, + prior_nft_ownership: &AHashMap, + tokens_burned: &TokenV2Burned, + object_metadatas: &ObjectAggregatedDataMapping, + conn: &mut DbPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, + ) -> anyhow::Result> { + let token_data_id = standardize_address(&write_resource.address.to_string()); + if tokens_burned + .get(&standardize_address(&token_data_id)) + .is_some() + { + if let Some(object) = &ObjectWithMetadata::from_write_resource(write_resource)? { + let object_core = &object.object_core; + let owner_address = object_core.get_owner_address(); + let storage_id = token_data_id.clone(); + + // is_soulbound currently means if an object is completely untransferrable + // OR if only admin can transfer. Only the former is true soulbound but + // people might already be using it with the latter meaning so let's include both. + let is_soulbound = if object_metadatas + .get(&token_data_id) + .map(|obj| obj.untransferable.as_ref()) + .is_some() + { + true + } else { + !object_core.allow_ungated_transfer + }; + let non_transferrable_by_owner = !object_core.allow_ungated_transfer; + + return Ok(Some(( + Self { + transaction_version: txn_version, + write_set_change_index, + token_data_id: token_data_id.clone(), + property_version_v1: BigDecimal::zero(), + owner_address: Some(owner_address.clone()), + storage_id: storage_id.clone(), + amount: BigDecimal::zero(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: Some(is_soulbound), + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: Some(false), + transaction_timestamp: txn_timestamp, + non_transferrable_by_owner: Some(non_transferrable_by_owner), + }, + RawCurrentTokenOwnershipV2 { + token_data_id, + property_version_v1: BigDecimal::zero(), + owner_address, + storage_id, + amount: BigDecimal::zero(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: Some(is_soulbound), + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: Some(false), + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + non_transferrable_by_owner: Some(non_transferrable_by_owner), + }, + ))); + } else { + return Self::get_burned_nft_v2_helper( + &token_data_id, + txn_version, + write_set_change_index, + txn_timestamp, + prior_nft_ownership, + tokens_burned, + conn, + query_retries, + query_retry_delay_ms, + ) + .await; + } + } + Ok(None) + } + + /// This handles the case where token is burned and objectCore is deleted + pub async fn get_burned_nft_v2_from_delete_resource( + delete_resource: &DeleteResource, + txn_version: i64, + write_set_change_index: i64, + txn_timestamp: chrono::NaiveDateTime, + prior_nft_ownership: &AHashMap, + tokens_burned: &TokenV2Burned, + conn: &mut DbPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, + ) -> anyhow::Result> { + let token_address = standardize_address(&delete_resource.address.to_string()); + Self::get_burned_nft_v2_helper( + &token_address, + txn_version, + write_set_change_index, + txn_timestamp, + prior_nft_ownership, + tokens_burned, + conn, + query_retries, + query_retry_delay_ms, + ) + .await + } + + async fn get_burned_nft_v2_helper( + token_address: &str, + txn_version: i64, + write_set_change_index: i64, + txn_timestamp: chrono::NaiveDateTime, + prior_nft_ownership: &AHashMap, + tokens_burned: &TokenV2Burned, + conn: &mut DbPoolConnection<'_>, + query_retries: u32, + query_retry_delay_ms: u64, + ) -> anyhow::Result> { + let token_address = standardize_address(token_address); + if let Some(burn_event) = tokens_burned.get(&token_address) { + // 1. Try to lookup token address in burn event mapping + let previous_owner = if let Some(previous_owner) = + burn_event.get_previous_owner_address() + { + previous_owner + } else { + // 2. If it doesn't exist in burn event mapping, then it must be an old burn event that doesn't contain previous_owner. + // Do a lookup to get previous owner. This is necessary because previous owner is part of current token ownerships primary key. + match prior_nft_ownership.get(&token_address) { + Some(inner) => inner.owner_address.clone(), + None => { + match CurrentTokenOwnershipV2Query::get_latest_owned_nft_by_token_data_id( + conn, + &token_address, + query_retries, + query_retry_delay_ms, + ) + .await + { + Ok(nft) => nft.owner_address.clone(), + Err(_) => { + tracing::error!( + transaction_version = txn_version, + lookup_key = &token_address, + "Failed to find current_token_ownership_v2 for burned token. You probably should backfill db." + ); + DEFAULT_OWNER_ADDRESS.to_string() + }, + } + }, + } + }; + + let token_data_id = token_address.clone(); + let storage_id = token_data_id.clone(); + + return Ok(Some(( + Self { + transaction_version: txn_version, + write_set_change_index, + token_data_id: token_data_id.clone(), + property_version_v1: BigDecimal::zero(), + owner_address: Some(previous_owner.clone()), + storage_id: storage_id.clone(), + amount: BigDecimal::zero(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: None, // default + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: None, // default + transaction_timestamp: txn_timestamp, + non_transferrable_by_owner: None, // default + }, + RawCurrentTokenOwnershipV2 { + token_data_id, + property_version_v1: BigDecimal::zero(), + owner_address: previous_owner, + storage_id, + amount: BigDecimal::zero(), + table_type_v1: None, + token_properties_mutated_v1: None, + is_soulbound_v2: None, // default + token_standard: TokenStandard::V2.to_string(), + is_fungible_v2: None, // default + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + non_transferrable_by_owner: None, // default + }, + ))); + } + Ok(None) + } + + /// We want to track tokens in any offer/claims and tokenstore + pub fn get_v1_from_write_table_item( + table_item: &WriteTableItem, + txn_version: i64, + write_set_change_index: i64, + txn_timestamp: chrono::NaiveDateTime, + table_handle_to_owner: &TableHandleToOwner, + ) -> anyhow::Result)>> { + let table_item_data = table_item.data.as_ref().unwrap(); + + let maybe_token = match TokenWriteSet::from_table_item_type( + table_item_data.value_type.as_str(), + &table_item_data.value, + txn_version, + )? { + Some(TokenWriteSet::Token(inner)) => Some(inner), + _ => None, + }; + + if let Some(token) = maybe_token { + let table_handle = standardize_address(&table_item.handle.to_string()); + let amount = ensure_not_negative(token.amount); + let token_id_struct = token.id; + let token_data_id_struct = token_id_struct.token_data_id; + let token_data_id = token_data_id_struct.to_id(); + + let maybe_table_metadata = table_handle_to_owner.get(&table_handle); + let (curr_token_ownership, owner_address, table_type) = match maybe_table_metadata { + Some(tm) => { + if tm.table_type != "0x3::token::TokenStore" { + return Ok(None); + } + let owner_address = tm.get_owner_address(); + ( + Some(RawCurrentTokenOwnershipV2 { + token_data_id: token_data_id.clone(), + property_version_v1: token_id_struct.property_version.clone(), + owner_address: owner_address.clone(), + storage_id: table_handle.clone(), + amount: amount.clone(), + table_type_v1: Some(tm.table_type.clone()), + token_properties_mutated_v1: Some(token.token_properties.clone()), + is_soulbound_v2: None, + token_standard: TokenStandard::V1.to_string(), + is_fungible_v2: None, + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + non_transferrable_by_owner: None, + }), + Some(owner_address), + Some(tm.table_type.clone()), + ) + }, + None => (None, None, None), + }; + + Ok(Some(( + Self { + transaction_version: txn_version, + write_set_change_index, + token_data_id, + property_version_v1: token_id_struct.property_version, + owner_address, + storage_id: table_handle, + amount, + table_type_v1: table_type, + token_properties_mutated_v1: Some(token.token_properties), + is_soulbound_v2: None, + token_standard: TokenStandard::V1.to_string(), + is_fungible_v2: None, + transaction_timestamp: txn_timestamp, + non_transferrable_by_owner: None, + }, + curr_token_ownership, + ))) + } else { + Ok(None) + } + } + + /// We want to track tokens in any offer/claims and tokenstore + pub fn get_v1_from_delete_table_item( + table_item: &DeleteTableItem, + txn_version: i64, + write_set_change_index: i64, + txn_timestamp: chrono::NaiveDateTime, + table_handle_to_owner: &TableHandleToOwner, + ) -> anyhow::Result)>> { + let table_item_data = table_item.data.as_ref().unwrap(); + + let maybe_token_id = match TokenWriteSet::from_table_item_type( + table_item_data.key_type.as_str(), + &table_item_data.key, + txn_version, + )? { + Some(TokenWriteSet::TokenId(inner)) => Some(inner), + _ => None, + }; + + if let Some(token_id_struct) = maybe_token_id { + let table_handle = standardize_address(&table_item.handle.to_string()); + let token_data_id_struct = token_id_struct.token_data_id; + let token_data_id = token_data_id_struct.to_id(); + + let maybe_table_metadata = table_handle_to_owner.get(&table_handle); + let (curr_token_ownership, owner_address, table_type) = match maybe_table_metadata { + Some(tm) => { + if tm.table_type != "0x3::token::TokenStore" { + return Ok(None); + } + let owner_address = tm.get_owner_address(); + ( + Some(RawCurrentTokenOwnershipV2 { + token_data_id: token_data_id.clone(), + property_version_v1: token_id_struct.property_version.clone(), + owner_address: owner_address.clone(), + storage_id: table_handle.clone(), + amount: BigDecimal::zero(), + table_type_v1: Some(tm.table_type.clone()), + token_properties_mutated_v1: None, + is_soulbound_v2: None, + token_standard: TokenStandard::V1.to_string(), + is_fungible_v2: None, + last_transaction_version: txn_version, + last_transaction_timestamp: txn_timestamp, + non_transferrable_by_owner: None, + }), + Some(owner_address), + Some(tm.table_type.clone()), + ) + }, + None => (None, None, None), + }; + + Ok(Some(( + Self { + transaction_version: txn_version, + write_set_change_index, + token_data_id, + property_version_v1: token_id_struct.property_version, + owner_address, + storage_id: table_handle, + amount: BigDecimal::zero(), + table_type_v1: table_type, + token_properties_mutated_v1: None, + is_soulbound_v2: None, + token_standard: TokenStandard::V1.to_string(), + is_fungible_v2: None, + transaction_timestamp: txn_timestamp, + non_transferrable_by_owner: None, + }, + curr_token_ownership, + ))) + } else { + Ok(None) + } + } +} + +impl CurrentTokenOwnershipV2Query { + pub async fn get_latest_owned_nft_by_token_data_id( + conn: &mut DbPoolConnection<'_>, + token_data_id: &str, + query_retries: u32, + query_retry_delay_ms: u64, + ) -> anyhow::Result { + let mut tried = 0; + while tried < query_retries { + tried += 1; + match Self::get_latest_owned_nft_by_token_data_id_impl(conn, token_data_id).await { + Ok(inner) => { + return Ok(NFTOwnershipV2 { + token_data_id: inner.token_data_id.clone(), + owner_address: inner.owner_address.clone(), + is_soulbound: inner.is_soulbound_v2, + }); + }, + Err(_) => { + if tried < query_retries { + tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms)) + .await; + } + }, + } + } + Err(anyhow::anyhow!( + "Failed to get nft by token data id: {}", + token_data_id + )) + } + + async fn get_latest_owned_nft_by_token_data_id_impl( + conn: &mut DbPoolConnection<'_>, + token_data_id: &str, + ) -> diesel::QueryResult { + current_token_ownerships_v2::table + .filter(current_token_ownerships_v2::token_data_id.eq(token_data_id)) + .filter(current_token_ownerships_v2::amount.gt(BigDecimal::zero())) + .first::(conn) + .await + } +} diff --git a/rust/processor/src/db/common/models/token_v2_models/v2_token_utils.rs b/rust/processor/src/db/common/models/token_v2_models/v2_token_utils.rs new file mode 100644 index 000000000..b6bab62bf --- /dev/null +++ b/rust/processor/src/db/common/models/token_v2_models/v2_token_utils.rs @@ -0,0 +1,433 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] + +use crate::{ + db::postgres::models::{ + object_models::v2_object_utils::CurrentObjectPK, + token_models::token_utils::{NAME_LENGTH, URI_LENGTH}, + }, + utils::util::{ + deserialize_from_string, deserialize_token_object_property_map_from_bcs_hexstring, + standardize_address, truncate_str, Aggregator, AggregatorSnapshot, DerivedStringSnapshot, + }, +}; +use ahash::{AHashMap, AHashSet}; +use anyhow::{Context, Result}; +use aptos_protos::transaction::v1::{Event, WriteResource}; +use bigdecimal::BigDecimal; +use lazy_static::lazy_static; +use serde::{Deserialize, Serialize}; +use std::fmt::{self, Formatter}; + +pub const DEFAULT_OWNER_ADDRESS: &str = "unknown"; + +lazy_static! { + pub static ref V2_STANDARD: String = TokenStandard::V2.to_string(); +} + +/// Tracks all token related data in a hashmap for quick access (keyed on address of the object core) +/// Maps address to burn event. If it's an old event previous_owner will be empty +pub type TokenV2Burned = AHashMap; +pub type TokenV2Minted = AHashSet; + +/// Tracks which token standard a token / collection is built upon +#[derive(Serialize)] +pub enum TokenStandard { + V1, + V2, +} + +impl fmt::Display for TokenStandard { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + let res = match self { + TokenStandard::V1 => "v1", + TokenStandard::V2 => "v2", + }; + write!(f, "{}", res) + } +} + +/* Section on Collection / Token */ +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Collection { + creator: String, + pub description: String, + // These are set to private because we should never get name or uri directly + name: String, + uri: String, +} + +impl Collection { + pub fn get_creator_address(&self) -> String { + standardize_address(&self.creator) + } + + pub fn get_uri_trunc(&self) -> String { + truncate_str(&self.uri, URI_LENGTH) + } + + pub fn get_name_trunc(&self) -> String { + truncate_str(&self.name, NAME_LENGTH) + } +} + +impl TryFrom<&WriteResource> for Collection { + type Error = anyhow::Error; + + fn try_from(write_resource: &WriteResource) -> anyhow::Result { + serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct AptosCollection { + pub mutable_description: bool, + pub mutable_uri: bool, +} + +impl TryFrom<&WriteResource> for AptosCollection { + type Error = anyhow::Error; + + fn try_from(write_resource: &WriteResource) -> anyhow::Result { + serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct TokenV2 { + collection: ResourceReference, + pub description: String, + // These are set to private because we should never get name or uri directly + name: String, + uri: String, +} + +impl TryFrom<&WriteResource> for TokenV2 { + type Error = anyhow::Error; + + fn try_from(write_resource: &WriteResource) -> anyhow::Result { + serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) + } +} + +impl TokenV2 { + pub fn get_collection_address(&self) -> String { + self.collection.get_reference_address() + } + + pub fn get_uri_trunc(&self) -> String { + truncate_str(&self.uri, URI_LENGTH) + } + + pub fn get_name_trunc(&self) -> String { + truncate_str(&self.name, NAME_LENGTH) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ResourceReference { + inner: String, +} + +impl ResourceReference { + pub fn get_reference_address(&self) -> String { + standardize_address(&self.inner) + } +} + +/* Section on Supply */ +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct FixedSupply { + #[serde(deserialize_with = "deserialize_from_string")] + pub current_supply: BigDecimal, + #[serde(deserialize_with = "deserialize_from_string")] + pub max_supply: BigDecimal, + #[serde(deserialize_with = "deserialize_from_string")] + pub total_minted: BigDecimal, +} + +impl TryFrom<&WriteResource> for FixedSupply { + type Error = anyhow::Error; + + fn try_from(write_resource: &WriteResource) -> anyhow::Result { + serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct UnlimitedSupply { + #[serde(deserialize_with = "deserialize_from_string")] + pub current_supply: BigDecimal, + #[serde(deserialize_with = "deserialize_from_string")] + pub total_minted: BigDecimal, +} + +impl TryFrom<&WriteResource> for UnlimitedSupply { + type Error = anyhow::Error; + + fn try_from(write_resource: &WriteResource) -> anyhow::Result { + serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ConcurrentSupply { + pub current_supply: Aggregator, + pub total_minted: Aggregator, +} + +impl TryFrom<&WriteResource> for ConcurrentSupply { + type Error = anyhow::Error; + + fn try_from(write_resource: &WriteResource) -> anyhow::Result { + serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) + } +} + +/* Section on Events */ +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MintEvent { + #[serde(deserialize_with = "deserialize_from_string")] + pub index: BigDecimal, + token: String, +} + +impl MintEvent { + pub fn from_event(event: &Event, txn_version: i64) -> anyhow::Result> { + if let Some(V2TokenEvent::MintEvent(inner)) = + V2TokenEvent::from_event(event.type_str.as_str(), &event.data, txn_version).unwrap() + { + Ok(Some(inner)) + } else { + Ok(None) + } + } + + pub fn get_token_address(&self) -> String { + standardize_address(&self.token) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Mint { + collection: String, + pub index: AggregatorSnapshot, + token: String, +} + +impl Mint { + pub fn from_event(event: &Event, txn_version: i64) -> anyhow::Result> { + if let Some(V2TokenEvent::Mint(inner)) = + V2TokenEvent::from_event(event.type_str.as_str(), &event.data, txn_version).unwrap() + { + Ok(Some(inner)) + } else { + Ok(None) + } + } + + pub fn get_token_address(&self) -> String { + standardize_address(&self.token) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct TokenMutationEvent { + pub mutated_field_name: String, + pub old_value: String, + pub new_value: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct TokenMutationEventV2 { + pub token_address: String, + pub mutated_field_name: String, + pub old_value: String, + pub new_value: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct BurnEvent { + #[serde(deserialize_with = "deserialize_from_string")] + pub index: BigDecimal, + token: String, +} + +impl BurnEvent { + pub fn from_event(event: &Event, txn_version: i64) -> anyhow::Result> { + if let Some(V2TokenEvent::BurnEvent(inner)) = + V2TokenEvent::from_event(event.type_str.as_str(), &event.data, txn_version).unwrap() + { + Ok(Some(inner)) + } else { + Ok(None) + } + } + + pub fn get_token_address(&self) -> String { + standardize_address(&self.token) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Burn { + collection: String, + #[serde(deserialize_with = "deserialize_from_string")] + pub index: BigDecimal, + token: String, + previous_owner: String, +} + +impl Burn { + pub fn new( + collection: String, + index: BigDecimal, + token: String, + previous_owner: String, + ) -> Self { + Burn { + collection, + index, + token, + previous_owner, + } + } + + pub fn from_event(event: &Event, txn_version: i64) -> anyhow::Result> { + if let Some(V2TokenEvent::Burn(inner)) = + V2TokenEvent::from_event(event.type_str.as_str(), &event.data, txn_version).unwrap() + { + Ok(Some(inner)) + } else { + Ok(None) + } + } + + pub fn get_token_address(&self) -> String { + standardize_address(&self.token) + } + + pub fn get_previous_owner_address(&self) -> Option { + if self.previous_owner.is_empty() { + None + } else { + Some(standardize_address(&self.previous_owner)) + } + } + + pub fn get_collection_address(&self) -> String { + standardize_address(&self.collection) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct TransferEvent { + from: String, + to: String, + object: String, +} + +impl TransferEvent { + pub fn from_event(event: &Event, txn_version: i64) -> anyhow::Result> { + if let Some(V2TokenEvent::TransferEvent(inner)) = + V2TokenEvent::from_event(event.type_str.as_str(), &event.data, txn_version).unwrap() + { + Ok(Some(inner)) + } else { + Ok(None) + } + } + + pub fn get_from_address(&self) -> String { + standardize_address(&self.from) + } + + pub fn get_to_address(&self) -> String { + standardize_address(&self.to) + } + + pub fn get_object_address(&self) -> String { + standardize_address(&self.object) + } +} + +/* Section on Property Maps */ +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PropertyMapModel { + #[serde(deserialize_with = "deserialize_token_object_property_map_from_bcs_hexstring")] + pub inner: serde_json::Value, +} + +impl TryFrom<&WriteResource> for PropertyMapModel { + type Error = anyhow::Error; + + fn try_from(write_resource: &WriteResource) -> anyhow::Result { + serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct TokenIdentifiers { + name: DerivedStringSnapshot, +} + +impl TryFrom<&WriteResource> for TokenIdentifiers { + type Error = anyhow::Error; + + fn try_from(write_resource: &WriteResource) -> anyhow::Result { + serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) + } +} + +impl TokenIdentifiers { + pub fn get_name_trunc(&self) -> String { + truncate_str(&self.name.value, NAME_LENGTH) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum V2TokenEvent { + Mint(Mint), + MintEvent(MintEvent), + TokenMutationEvent(TokenMutationEvent), + TokenMutation(TokenMutationEventV2), + Burn(Burn), + BurnEvent(BurnEvent), + TransferEvent(TransferEvent), +} + +impl V2TokenEvent { + pub fn from_event(data_type: &str, data: &str, txn_version: i64) -> Result> { + match data_type { + "0x4::collection::Mint" => { + serde_json::from_str(data).map(|inner| Some(Self::Mint(inner))) + }, + "0x4::collection::MintEvent" => { + serde_json::from_str(data).map(|inner| Some(Self::MintEvent(inner))) + }, + "0x4::token::MutationEvent" => { + serde_json::from_str(data).map(|inner| Some(Self::TokenMutationEvent(inner))) + }, + "0x4::token::Mutation" => { + serde_json::from_str(data).map(|inner| Some(Self::TokenMutation(inner))) + }, + "0x4::collection::Burn" => { + serde_json::from_str(data).map(|inner| Some(Self::Burn(inner))) + }, + "0x4::collection::BurnEvent" => { + serde_json::from_str(data).map(|inner| Some(Self::BurnEvent(inner))) + }, + "0x1::object::TransferEvent" | "0x1::object::Transfer" => { + serde_json::from_str(data).map(|inner| Some(Self::TransferEvent(inner))) + }, + _ => Ok(None), + } + .context(format!( + "version {} failed! failed to parse type {}, data {:?}", + txn_version, data_type, data + )) + } +} diff --git a/rust/processor/src/db/parquet/models/token_v2_models/v2_token_ownerships.rs b/rust/processor/src/db/parquet/models/token_v2_models/v2_token_ownerships.rs index 74516dd51..2bfdc8a71 100644 --- a/rust/processor/src/db/parquet/models/token_v2_models/v2_token_ownerships.rs +++ b/rust/processor/src/db/parquet/models/token_v2_models/v2_token_ownerships.rs @@ -15,8 +15,7 @@ use crate::{ resources::FromWriteResource, token_models::{token_utils::TokenWriteSet, tokens::TableHandleToOwner}, token_v2_models::{ - v2_token_ownerships::{CurrentTokenOwnershipV2, NFTOwnershipV2}, - v2_token_utils::{TokenStandard, TokenV2Burned, DEFAULT_OWNER_ADDRESS}, + v2_token_ownerships::{CurrentTokenOwnershipV2}, }, }, }, @@ -32,6 +31,10 @@ use bigdecimal::{BigDecimal, ToPrimitive, Zero}; use field_count::FieldCount; use parquet_derive::ParquetRecordWriter; use serde::{Deserialize, Serialize}; +use crate::db::common::models::token_v2_models::v2_token_utils::TokenStandard; +use crate::db::common::models::token_v2_models::raw_v2_token_ownerships::NFTOwnershipV2; +use crate::db::common::models::token_v2_models::v2_token_utils::DEFAULT_OWNER_ADDRESS; +use crate::db::common::models::token_v2_models::v2_token_utils::TokenV2Burned; const LEGACY_DEFAULT_PROPERTY_VERSION: u64 = 0; @@ -71,6 +74,7 @@ impl GetTimeStamp for TokenOwnershipV2 { } } + impl TokenOwnershipV2 { /// For nfts it's the same resources that we parse tokendatas from so we leverage the work done in there to get ownership data /// Vecs are returned because there could be multiple transfers and we need to document each one here. @@ -507,3 +511,109 @@ impl TokenOwnershipV2 { Ok(None) } } + + +// // Copyright © Aptos Foundation +// // SPDX-License-Identifier: Apache-2.0 +// +// // This is required because a diesel macro makes clippy sad +// #![allow(clippy::extra_unused_lifetimes)] +// #![allow(clippy::unused_unit)] +// +// use crate::{ +// bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, +// }; +// use allocative_derive::Allocative; +// use anyhow::Context; +// use field_count::FieldCount; +// use parquet_derive::ParquetRecordWriter; +// use serde::{Deserialize, Serialize}; +// use crate::db::common::models::token_v2_models::raw_v2_token_ownerships::TokenOwnershipV2Convertible; +// use crate::db::common::models::token_v2_models::raw_v2_token_ownerships::RawTokenOwnershipV2; +// use bigdecimal::{BigDecimal, ToPrimitive}; +// +// +// const LEGACY_DEFAULT_PROPERTY_VERSION: u64 = 0; +// +// #[derive( +// Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize, +// )] +// pub struct TokenOwnershipV2 { +// pub txn_version: i64, +// pub write_set_change_index: i64, +// pub token_data_id: String, +// pub property_version_v1: u64, +// pub owner_address: Option, +// pub storage_id: String, +// pub amount: String, // this is a string representation of a bigdecimal +// pub table_type_v1: Option, +// pub token_properties_mutated_v1: Option, +// pub is_soulbound_v2: Option, +// pub token_standard: String, +// #[allocative(skip)] +// pub block_timestamp: chrono::NaiveDateTime, +// pub non_transferrable_by_owner: Option, +// } +// +// impl NamedTable for TokenOwnershipV2 { +// const TABLE_NAME: &'static str = "token_ownerships_v2"; +// } +// +// impl HasVersion for TokenOwnershipV2 { +// fn version(&self) -> i64 { +// self.txn_version +// } +// } +// +// impl GetTimeStamp for TokenOwnershipV2 { +// fn get_timestamp(&self) -> chrono::NaiveDateTime { +// self.block_timestamp +// } +// } +// +// impl TokenOwnershipV2Convertible for TokenOwnershipV2 { +// fn from_raw(raw_item: RawTokenOwnershipV2) -> Self { +// Self { +// txn_version: raw_item.transaction_version, +// write_set_change_index: raw_item.write_set_change_index, +// token_data_id: raw_item.token_data_id, +// property_version_v1: raw_item.property_version_v1.to_u64().unwrap(), +// owner_address: raw_item.owner_address, +// storage_id: raw_item.storage_id, +// amount: raw_item.amount.to_string(), +// table_type_v1: raw_item.table_type_v1, +// token_properties_mutated_v1: raw_item.token_properties_mutated_v1.map(|v| v.to_string()), +// is_soulbound_v2: raw_item.is_soulbound_v2, +// token_standard: raw_item.token_standard, +// block_timestamp: raw_item.transaction_timestamp, +// non_transferrable_by_owner: raw_item.non_transferrable_by_owner, +// } +// } +// } +// +// #[derive( +// Clone, Debug, Deserialize, Eq, PartialEq, Serialize, +// )] +// pub struct CurrentTokenOwnershipV2 { +// pub token_data_id: String, +// pub property_version_v1: BigDecimal, +// pub owner_address: String, +// pub storage_id: String, +// pub amount: BigDecimal, +// pub table_type_v1: Option, +// pub token_properties_mutated_v1: Option, +// pub is_soulbound_v2: Option, +// pub token_standard: String, +// pub is_fungible_v2: Option, +// pub last_transaction_version: i64, +// pub last_transaction_timestamp: chrono::NaiveDateTime, +// pub non_transferrable_by_owner: Option, +// } +// +// // Facilitate tracking when a token is burned +// #[derive(Clone, Debug)] +// pub struct NFTOwnershipV2 { +// pub token_data_id: String, +// pub owner_address: String, +// pub is_soulbound: Option, +// } diff --git a/rust/processor/src/db/postgres/models/ans_models/parquet_ans_lookup_v2.rs b/rust/processor/src/db/postgres/models/ans_models/parquet_ans_lookup_v2.rs index db7cf6a56..0a87df5fb 100644 --- a/rust/processor/src/db/postgres/models/ans_models/parquet_ans_lookup_v2.rs +++ b/rust/processor/src/db/postgres/models/ans_models/parquet_ans_lookup_v2.rs @@ -12,7 +12,7 @@ use crate::{ ans_lookup::{AnsPrimaryName, CurrentAnsPrimaryName}, ans_utils::SetReverseLookupEvent, }, - token_v2_models::v2_token_utils::TokenStandard, + }, }; use allocative_derive::Allocative; @@ -20,6 +20,8 @@ use aptos_protos::transaction::v1::Event; use field_count::FieldCount; use parquet_derive::ParquetRecordWriter; use serde::{Deserialize, Serialize}; +use crate::db::common::models::token_v2_models::v2_token_utils::TokenStandard; + #[derive( Allocative, Clone, Default, Debug, Deserialize, FieldCount, ParquetRecordWriter, Serialize, diff --git a/rust/processor/src/db/postgres/models/fungible_asset_models/v2_fungible_asset_utils.rs b/rust/processor/src/db/postgres/models/fungible_asset_models/v2_fungible_asset_utils.rs index 8bd504582..120424d3e 100644 --- a/rust/processor/src/db/postgres/models/fungible_asset_models/v2_fungible_asset_utils.rs +++ b/rust/processor/src/db/postgres/models/fungible_asset_models/v2_fungible_asset_utils.rs @@ -6,7 +6,7 @@ use crate::{ db::postgres::models::{ - token_models::token_utils::URI_LENGTH, token_v2_models::v2_token_utils::ResourceReference, + token_models::token_utils::URI_LENGTH, }, utils::util::{deserialize_from_string, truncate_str, Aggregator}, }; @@ -15,6 +15,9 @@ use aptos_protos::transaction::v1::WriteResource; use bigdecimal::BigDecimal; use field_count::FieldCount; use serde::{Deserialize, Serialize}; +use crate::db::common::models::token_v2_models::v2_token_utils::ResourceReference; + + const FUNGIBLE_ASSET_LENGTH: usize = 32; const FUNGIBLE_ASSET_SYMBOL: usize = 10; diff --git a/rust/processor/src/db/postgres/models/object_models/v2_object_utils.rs b/rust/processor/src/db/postgres/models/object_models/v2_object_utils.rs index 4f4f18aa7..76b3645c1 100644 --- a/rust/processor/src/db/postgres/models/object_models/v2_object_utils.rs +++ b/rust/processor/src/db/postgres/models/object_models/v2_object_utils.rs @@ -12,10 +12,7 @@ use crate::{ FungibleAssetStore, FungibleAssetSupply, }, resources::FromWriteResource, - token_v2_models::v2_token_utils::{ - AptosCollection, ConcurrentSupply, FixedSupply, PropertyMapModel, TokenIdentifiers, - TokenV2, TransferEvent, UnlimitedSupply, - }, + }, utils::util::{deserialize_from_string, standardize_address}, }; @@ -23,7 +20,7 @@ use ahash::AHashMap; use aptos_protos::transaction::v1::WriteResource; use bigdecimal::BigDecimal; use serde::{Deserialize, Serialize}; - +use crate::db::common::models::token_v2_models::v2_token_utils::{TransferEvent, AptosCollection, FixedSupply, PropertyMapModel, TokenV2, UnlimitedSupply, ConcurrentSupply, TokenIdentifiers}; // PK of current_objects, i.e. object_address pub type CurrentObjectPK = String; diff --git a/rust/processor/src/db/postgres/models/resources.rs b/rust/processor/src/db/postgres/models/resources.rs index 8610c895f..b36120e51 100644 --- a/rust/processor/src/db/postgres/models/resources.rs +++ b/rust/processor/src/db/postgres/models/resources.rs @@ -8,14 +8,13 @@ use crate::db::postgres::models::{ FungibleAssetStore, FungibleAssetSupply, }, object_models::v2_object_utils::{ObjectCore, Untransferable}, - token_v2_models::v2_token_utils::{ - AptosCollection, Collection, ConcurrentSupply, FixedSupply, PropertyMapModel, - TokenIdentifiers, TokenV2, UnlimitedSupply, - }, + }; use anyhow::Result; use aptos_protos::transaction::v1::WriteResource; use const_format::formatcp; +use crate::db::common::models::token_v2_models::v2_token_utils::{Collection, ConcurrentSupply, FixedSupply, PropertyMapModel, TokenIdentifiers, TokenV2, UnlimitedSupply}; +use crate::db::common::models::token_v2_models::v2_token_utils::AptosCollection; pub const COIN_ADDR: &str = "0x0000000000000000000000000000000000000000000000000000000000000001"; pub const TOKEN_ADDR: &str = "0x0000000000000000000000000000000000000000000000000000000000000003"; diff --git a/rust/processor/src/db/postgres/models/token_v2_models/mod.rs b/rust/processor/src/db/postgres/models/token_v2_models/mod.rs index 49bd71da5..051cf246a 100644 --- a/rust/processor/src/db/postgres/models/token_v2_models/mod.rs +++ b/rust/processor/src/db/postgres/models/token_v2_models/mod.rs @@ -7,4 +7,3 @@ pub mod v2_token_activities; pub mod v2_token_datas; pub mod v2_token_metadata; pub mod v2_token_ownerships; -pub mod v2_token_utils; diff --git a/rust/processor/src/db/postgres/models/token_v2_models/v2_collections.rs b/rust/processor/src/db/postgres/models/token_v2_models/v2_collections.rs index 41a41dd24..fa012fb29 100644 --- a/rust/processor/src/db/postgres/models/token_v2_models/v2_collections.rs +++ b/rust/processor/src/db/postgres/models/token_v2_models/v2_collections.rs @@ -5,7 +5,6 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] -use super::v2_token_utils::{Collection, TokenStandard}; use crate::{ db::postgres::models::{ object_models::v2_object_utils::ObjectAggregatedDataMapping, @@ -26,6 +25,9 @@ use diesel::{prelude::*, sql_query, sql_types::Text}; use diesel_async::RunQueryDsl; use field_count::FieldCount; use serde::{Deserialize, Serialize}; +use crate::db::common::models::token_v2_models::v2_token_utils::Collection; +use crate::db::common::models::token_v2_models::v2_token_utils::TokenStandard; + // PK of current_collections_v2, i.e. collection_id pub type CurrentCollectionV2PK = String; diff --git a/rust/processor/src/db/postgres/models/token_v2_models/v2_token_ownerships.rs b/rust/processor/src/db/postgres/models/token_v2_models/v2_token_ownerships.rs index d7228486f..4fc7b5d42 100644 --- a/rust/processor/src/db/postgres/models/token_v2_models/v2_token_ownerships.rs +++ b/rust/processor/src/db/postgres/models/token_v2_models/v2_token_ownerships.rs @@ -5,36 +5,19 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] -use super::{ - v2_token_datas::TokenDataV2, - v2_token_utils::{TokenStandard, TokenV2Burned}, -}; use crate::{ - db::postgres::models::{ - object_models::v2_object_utils::{ObjectAggregatedDataMapping, ObjectWithMetadata}, - resources::FromWriteResource, - token_models::{token_utils::TokenWriteSet, tokens::TableHandleToOwner}, - token_v2_models::v2_token_utils::DEFAULT_OWNER_ADDRESS, - }, schema::{current_token_ownerships_v2, token_ownerships_v2}, - utils::{ - database::DbPoolConnection, - util::{ensure_not_negative, standardize_address}, - }, -}; -use ahash::AHashMap; -use anyhow::Context; -use aptos_protos::transaction::v1::{ - DeleteResource, DeleteTableItem, WriteResource, WriteTableItem, }; -use bigdecimal::{BigDecimal, One, Zero}; +use bigdecimal::{BigDecimal}; use diesel::prelude::*; -use diesel_async::RunQueryDsl; use field_count::FieldCount; use serde::{Deserialize, Serialize}; +use crate::db::common::models::token_v2_models::raw_v2_token_ownerships::{RawCurrentTokenOwnershipV2, RawTokenOwnershipV2}; +use crate::db::common::models::token_v2_models::raw_v2_token_ownerships::TokenOwnershipV2Convertible; +use crate::db::common::models::token_v2_models::raw_v2_token_ownerships::CurrentTokenOwnershipV2Convertible; // PK of current_token_ownerships_v2, i.e. token_data_id, property_version_v1, owner_address, storage_id -pub type CurrentTokenOwnershipV2PK = (String, BigDecimal, String, String); +// pub type CurrentTokenOwnershipV2PK = (String, BigDecimal, String, String); #[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(transaction_version, write_set_change_index))] @@ -56,6 +39,26 @@ pub struct TokenOwnershipV2 { pub non_transferrable_by_owner: Option, } +impl TokenOwnershipV2Convertible for TokenOwnershipV2 { + fn from_raw(raw_item: RawTokenOwnershipV2) -> Self{ + Self { + transaction_version: raw_item.transaction_version, + write_set_change_index: raw_item.write_set_change_index, + token_data_id: raw_item.token_data_id, + property_version_v1: raw_item.property_version_v1, + owner_address: raw_item.owner_address, + storage_id: raw_item.storage_id, + amount: raw_item.amount, + table_type_v1: raw_item.table_type_v1, + token_properties_mutated_v1: raw_item.token_properties_mutated_v1, + is_soulbound_v2: raw_item.is_soulbound_v2, + token_standard: raw_item.token_standard, + is_fungible_v2: raw_item.is_fungible_v2, + transaction_timestamp: raw_item.transaction_timestamp, + non_transferrable_by_owner: raw_item.non_transferrable_by_owner, + } + } +} #[derive( Clone, Debug, Deserialize, Eq, FieldCount, Identifiable, Insertable, PartialEq, Serialize, )] @@ -93,560 +96,581 @@ impl PartialOrd for CurrentTokenOwnershipV2 { } } -// Facilitate tracking when a token is burned -#[derive(Clone, Debug)] -pub struct NFTOwnershipV2 { - pub token_data_id: String, - pub owner_address: String, - pub is_soulbound: Option, -} - -/// Need a separate struct for queryable because we don't want to define the inserted_at column (letting DB fill) -#[derive(Clone, Debug, Identifiable, Queryable)] -#[diesel(primary_key(token_data_id, property_version_v1, owner_address, storage_id))] -#[diesel(table_name = current_token_ownerships_v2)] -pub struct CurrentTokenOwnershipV2Query { - pub token_data_id: String, - pub property_version_v1: BigDecimal, - pub owner_address: String, - pub storage_id: String, - pub amount: BigDecimal, - pub table_type_v1: Option, - pub token_properties_mutated_v1: Option, - pub is_soulbound_v2: Option, - pub token_standard: String, - pub is_fungible_v2: Option, - pub last_transaction_version: i64, - pub last_transaction_timestamp: chrono::NaiveDateTime, - pub inserted_at: chrono::NaiveDateTime, - pub non_transferrable_by_owner: Option, -} - -impl TokenOwnershipV2 { - /// For nfts it's the same resources that we parse tokendatas from so we leverage the work done in there to get ownership data - /// Vecs are returned because there could be multiple transfers in a single transaction and we need to document each one here. - pub fn get_nft_v2_from_token_data( - token_data: &TokenDataV2, - object_metadatas: &ObjectAggregatedDataMapping, - ) -> anyhow::Result<( - Vec, - AHashMap, - )> { - let mut ownerships = vec![]; - let mut current_ownerships = AHashMap::new(); - - let object_data = object_metadatas - .get(&token_data.token_data_id) - .context("If token data exists objectcore must exist")?; - let object_core = object_data.object.object_core.clone(); - let token_data_id = token_data.token_data_id.clone(); - let owner_address = object_core.get_owner_address(); - let storage_id = token_data_id.clone(); - - // is_soulbound currently means if an object is completely untransferrable - // OR if only admin can transfer. Only the former is true soulbound but - // people might already be using it with the latter meaning so let's include both. - let is_soulbound = if object_data.untransferable.as_ref().is_some() { - true - } else { - !object_core.allow_ungated_transfer - }; - let non_transferrable_by_owner = !object_core.allow_ungated_transfer; - - ownerships.push(Self { - transaction_version: token_data.transaction_version, - write_set_change_index: token_data.write_set_change_index, - token_data_id: token_data_id.clone(), - property_version_v1: BigDecimal::zero(), - owner_address: Some(owner_address.clone()), - storage_id: storage_id.clone(), - amount: BigDecimal::one(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: Some(is_soulbound), - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2: None, - transaction_timestamp: token_data.transaction_timestamp, - non_transferrable_by_owner: Some(non_transferrable_by_owner), - }); - current_ownerships.insert( - ( - token_data_id.clone(), - BigDecimal::zero(), - owner_address.clone(), - storage_id.clone(), - ), - CurrentTokenOwnershipV2 { - token_data_id: token_data_id.clone(), - property_version_v1: BigDecimal::zero(), - owner_address, - storage_id: storage_id.clone(), - amount: BigDecimal::one(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: Some(is_soulbound), - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2: None, - last_transaction_version: token_data.transaction_version, - last_transaction_timestamp: token_data.transaction_timestamp, - non_transferrable_by_owner: Some(non_transferrable_by_owner), - }, - ); - - // check if token was transferred - for (event_index, transfer_event) in &object_data.transfer_events { - // If it's a self transfer then skip - if transfer_event.get_to_address() == transfer_event.get_from_address() { - continue; - } - ownerships.push(Self { - transaction_version: token_data.transaction_version, - // set to negative of event index to avoid collison with write set index - write_set_change_index: -1 * event_index, - token_data_id: token_data_id.clone(), - property_version_v1: BigDecimal::zero(), - // previous owner - owner_address: Some(transfer_event.get_from_address()), - storage_id: storage_id.clone(), - // soft delete - amount: BigDecimal::zero(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: Some(is_soulbound), - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2: None, - transaction_timestamp: token_data.transaction_timestamp, - non_transferrable_by_owner: Some(is_soulbound), - }); - current_ownerships.insert( - ( - token_data_id.clone(), - BigDecimal::zero(), - transfer_event.get_from_address(), - storage_id.clone(), - ), - CurrentTokenOwnershipV2 { - token_data_id: token_data_id.clone(), - property_version_v1: BigDecimal::zero(), - // previous owner - owner_address: transfer_event.get_from_address(), - storage_id: storage_id.clone(), - // soft delete - amount: BigDecimal::zero(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: Some(is_soulbound), - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2: None, - last_transaction_version: token_data.transaction_version, - last_transaction_timestamp: token_data.transaction_timestamp, - non_transferrable_by_owner: Some(is_soulbound), - }, - ); - } - Ok((ownerships, current_ownerships)) - } - - /// This handles the case where token is burned but objectCore is still there - pub async fn get_burned_nft_v2_from_write_resource( - write_resource: &WriteResource, - txn_version: i64, - write_set_change_index: i64, - txn_timestamp: chrono::NaiveDateTime, - prior_nft_ownership: &AHashMap, - tokens_burned: &TokenV2Burned, - object_metadatas: &ObjectAggregatedDataMapping, - conn: &mut DbPoolConnection<'_>, - query_retries: u32, - query_retry_delay_ms: u64, - ) -> anyhow::Result> { - let token_data_id = standardize_address(&write_resource.address.to_string()); - if tokens_burned - .get(&standardize_address(&token_data_id)) - .is_some() - { - if let Some(object) = &ObjectWithMetadata::from_write_resource(write_resource)? { - let object_core = &object.object_core; - let owner_address = object_core.get_owner_address(); - let storage_id = token_data_id.clone(); - - // is_soulbound currently means if an object is completely untransferrable - // OR if only admin can transfer. Only the former is true soulbound but - // people might already be using it with the latter meaning so let's include both. - let is_soulbound = if object_metadatas - .get(&token_data_id) - .map(|obj| obj.untransferable.as_ref()) - .is_some() - { - true - } else { - !object_core.allow_ungated_transfer - }; - let non_transferrable_by_owner = !object_core.allow_ungated_transfer; - - return Ok(Some(( - Self { - transaction_version: txn_version, - write_set_change_index, - token_data_id: token_data_id.clone(), - property_version_v1: BigDecimal::zero(), - owner_address: Some(owner_address.clone()), - storage_id: storage_id.clone(), - amount: BigDecimal::zero(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: Some(is_soulbound), - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2: Some(false), - transaction_timestamp: txn_timestamp, - non_transferrable_by_owner: Some(non_transferrable_by_owner), - }, - CurrentTokenOwnershipV2 { - token_data_id, - property_version_v1: BigDecimal::zero(), - owner_address, - storage_id, - amount: BigDecimal::zero(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: Some(is_soulbound), - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2: Some(false), - last_transaction_version: txn_version, - last_transaction_timestamp: txn_timestamp, - non_transferrable_by_owner: Some(non_transferrable_by_owner), - }, - ))); - } else { - return Self::get_burned_nft_v2_helper( - &token_data_id, - txn_version, - write_set_change_index, - txn_timestamp, - prior_nft_ownership, - tokens_burned, - conn, - query_retries, - query_retry_delay_ms, - ) - .await; - } - } - Ok(None) - } - - /// This handles the case where token is burned and objectCore is deleted - pub async fn get_burned_nft_v2_from_delete_resource( - delete_resource: &DeleteResource, - txn_version: i64, - write_set_change_index: i64, - txn_timestamp: chrono::NaiveDateTime, - prior_nft_ownership: &AHashMap, - tokens_burned: &TokenV2Burned, - conn: &mut DbPoolConnection<'_>, - query_retries: u32, - query_retry_delay_ms: u64, - ) -> anyhow::Result> { - let token_address = standardize_address(&delete_resource.address.to_string()); - Self::get_burned_nft_v2_helper( - &token_address, - txn_version, - write_set_change_index, - txn_timestamp, - prior_nft_ownership, - tokens_burned, - conn, - query_retries, - query_retry_delay_ms, - ) - .await - } - - async fn get_burned_nft_v2_helper( - token_address: &str, - txn_version: i64, - write_set_change_index: i64, - txn_timestamp: chrono::NaiveDateTime, - prior_nft_ownership: &AHashMap, - tokens_burned: &TokenV2Burned, - conn: &mut DbPoolConnection<'_>, - query_retries: u32, - query_retry_delay_ms: u64, - ) -> anyhow::Result> { - let token_address = standardize_address(token_address); - if let Some(burn_event) = tokens_burned.get(&token_address) { - // 1. Try to lookup token address in burn event mapping - let previous_owner = if let Some(previous_owner) = - burn_event.get_previous_owner_address() - { - previous_owner - } else { - // 2. If it doesn't exist in burn event mapping, then it must be an old burn event that doesn't contain previous_owner. - // Do a lookup to get previous owner. This is necessary because previous owner is part of current token ownerships primary key. - match prior_nft_ownership.get(&token_address) { - Some(inner) => inner.owner_address.clone(), - None => { - match CurrentTokenOwnershipV2Query::get_latest_owned_nft_by_token_data_id( - conn, - &token_address, - query_retries, - query_retry_delay_ms, - ) - .await - { - Ok(nft) => nft.owner_address.clone(), - Err(_) => { - tracing::error!( - transaction_version = txn_version, - lookup_key = &token_address, - "Failed to find current_token_ownership_v2 for burned token. You probably should backfill db." - ); - DEFAULT_OWNER_ADDRESS.to_string() - }, - } - }, - } - }; - - let token_data_id = token_address.clone(); - let storage_id = token_data_id.clone(); - - return Ok(Some(( - Self { - transaction_version: txn_version, - write_set_change_index, - token_data_id: token_data_id.clone(), - property_version_v1: BigDecimal::zero(), - owner_address: Some(previous_owner.clone()), - storage_id: storage_id.clone(), - amount: BigDecimal::zero(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: None, // default - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2: None, // default - transaction_timestamp: txn_timestamp, - non_transferrable_by_owner: None, // default - }, - CurrentTokenOwnershipV2 { - token_data_id, - property_version_v1: BigDecimal::zero(), - owner_address: previous_owner, - storage_id, - amount: BigDecimal::zero(), - table_type_v1: None, - token_properties_mutated_v1: None, - is_soulbound_v2: None, // default - token_standard: TokenStandard::V2.to_string(), - is_fungible_v2: None, // default - last_transaction_version: txn_version, - last_transaction_timestamp: txn_timestamp, - non_transferrable_by_owner: None, // default - }, - ))); - } - Ok(None) - } - - /// We want to track tokens in any offer/claims and tokenstore - pub fn get_v1_from_write_table_item( - table_item: &WriteTableItem, - txn_version: i64, - write_set_change_index: i64, - txn_timestamp: chrono::NaiveDateTime, - table_handle_to_owner: &TableHandleToOwner, - ) -> anyhow::Result)>> { - let table_item_data = table_item.data.as_ref().unwrap(); - - let maybe_token = match TokenWriteSet::from_table_item_type( - table_item_data.value_type.as_str(), - &table_item_data.value, - txn_version, - )? { - Some(TokenWriteSet::Token(inner)) => Some(inner), - _ => None, - }; - - if let Some(token) = maybe_token { - let table_handle = standardize_address(&table_item.handle.to_string()); - let amount = ensure_not_negative(token.amount); - let token_id_struct = token.id; - let token_data_id_struct = token_id_struct.token_data_id; - let token_data_id = token_data_id_struct.to_id(); - - let maybe_table_metadata = table_handle_to_owner.get(&table_handle); - let (curr_token_ownership, owner_address, table_type) = match maybe_table_metadata { - Some(tm) => { - if tm.table_type != "0x3::token::TokenStore" { - return Ok(None); - } - let owner_address = tm.get_owner_address(); - ( - Some(CurrentTokenOwnershipV2 { - token_data_id: token_data_id.clone(), - property_version_v1: token_id_struct.property_version.clone(), - owner_address: owner_address.clone(), - storage_id: table_handle.clone(), - amount: amount.clone(), - table_type_v1: Some(tm.table_type.clone()), - token_properties_mutated_v1: Some(token.token_properties.clone()), - is_soulbound_v2: None, - token_standard: TokenStandard::V1.to_string(), - is_fungible_v2: None, - last_transaction_version: txn_version, - last_transaction_timestamp: txn_timestamp, - non_transferrable_by_owner: None, - }), - Some(owner_address), - Some(tm.table_type.clone()), - ) - }, - None => (None, None, None), - }; - - Ok(Some(( - Self { - transaction_version: txn_version, - write_set_change_index, - token_data_id, - property_version_v1: token_id_struct.property_version, - owner_address, - storage_id: table_handle, - amount, - table_type_v1: table_type, - token_properties_mutated_v1: Some(token.token_properties), - is_soulbound_v2: None, - token_standard: TokenStandard::V1.to_string(), - is_fungible_v2: None, - transaction_timestamp: txn_timestamp, - non_transferrable_by_owner: None, - }, - curr_token_ownership, - ))) - } else { - Ok(None) - } - } - - /// We want to track tokens in any offer/claims and tokenstore - pub fn get_v1_from_delete_table_item( - table_item: &DeleteTableItem, - txn_version: i64, - write_set_change_index: i64, - txn_timestamp: chrono::NaiveDateTime, - table_handle_to_owner: &TableHandleToOwner, - ) -> anyhow::Result)>> { - let table_item_data = table_item.data.as_ref().unwrap(); - - let maybe_token_id = match TokenWriteSet::from_table_item_type( - table_item_data.key_type.as_str(), - &table_item_data.key, - txn_version, - )? { - Some(TokenWriteSet::TokenId(inner)) => Some(inner), - _ => None, - }; - - if let Some(token_id_struct) = maybe_token_id { - let table_handle = standardize_address(&table_item.handle.to_string()); - let token_data_id_struct = token_id_struct.token_data_id; - let token_data_id = token_data_id_struct.to_id(); - - let maybe_table_metadata = table_handle_to_owner.get(&table_handle); - let (curr_token_ownership, owner_address, table_type) = match maybe_table_metadata { - Some(tm) => { - if tm.table_type != "0x3::token::TokenStore" { - return Ok(None); - } - let owner_address = tm.get_owner_address(); - ( - Some(CurrentTokenOwnershipV2 { - token_data_id: token_data_id.clone(), - property_version_v1: token_id_struct.property_version.clone(), - owner_address: owner_address.clone(), - storage_id: table_handle.clone(), - amount: BigDecimal::zero(), - table_type_v1: Some(tm.table_type.clone()), - token_properties_mutated_v1: None, - is_soulbound_v2: None, - token_standard: TokenStandard::V1.to_string(), - is_fungible_v2: None, - last_transaction_version: txn_version, - last_transaction_timestamp: txn_timestamp, - non_transferrable_by_owner: None, - }), - Some(owner_address), - Some(tm.table_type.clone()), - ) - }, - None => (None, None, None), - }; - - Ok(Some(( - Self { - transaction_version: txn_version, - write_set_change_index, - token_data_id, - property_version_v1: token_id_struct.property_version, - owner_address, - storage_id: table_handle, - amount: BigDecimal::zero(), - table_type_v1: table_type, - token_properties_mutated_v1: None, - is_soulbound_v2: None, - token_standard: TokenStandard::V1.to_string(), - is_fungible_v2: None, - transaction_timestamp: txn_timestamp, - non_transferrable_by_owner: None, - }, - curr_token_ownership, - ))) - } else { - Ok(None) +impl CurrentTokenOwnershipV2Convertible for CurrentTokenOwnershipV2 { + fn from_raw(raw_item: RawCurrentTokenOwnershipV2) -> Self { + Self { + token_data_id: raw_item.token_data_id, + property_version_v1: raw_item.property_version_v1, + owner_address: raw_item.owner_address, + storage_id: raw_item.storage_id, + amount: raw_item.amount, + table_type_v1: raw_item.table_type_v1, + token_properties_mutated_v1: raw_item.token_properties_mutated_v1, + is_soulbound_v2: raw_item.is_soulbound_v2, + token_standard: raw_item.token_standard, + is_fungible_v2: raw_item.is_fungible_v2, + last_transaction_version: raw_item.last_transaction_version, + last_transaction_timestamp: raw_item.last_transaction_timestamp, + non_transferrable_by_owner: raw_item.non_transferrable_by_owner, } } } -impl CurrentTokenOwnershipV2Query { - pub async fn get_latest_owned_nft_by_token_data_id( - conn: &mut DbPoolConnection<'_>, - token_data_id: &str, - query_retries: u32, - query_retry_delay_ms: u64, - ) -> anyhow::Result { - let mut tried = 0; - while tried < query_retries { - tried += 1; - match Self::get_latest_owned_nft_by_token_data_id_impl(conn, token_data_id).await { - Ok(inner) => { - return Ok(NFTOwnershipV2 { - token_data_id: inner.token_data_id.clone(), - owner_address: inner.owner_address.clone(), - is_soulbound: inner.is_soulbound_v2, - }); - }, - Err(_) => { - if tried < query_retries { - tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms)) - .await; - } - }, - } - } - Err(anyhow::anyhow!( - "Failed to get nft by token data id: {}", - token_data_id - )) - } - - async fn get_latest_owned_nft_by_token_data_id_impl( - conn: &mut DbPoolConnection<'_>, - token_data_id: &str, - ) -> diesel::QueryResult { - current_token_ownerships_v2::table - .filter(current_token_ownerships_v2::token_data_id.eq(token_data_id)) - .filter(current_token_ownerships_v2::amount.gt(BigDecimal::zero())) - .first::(conn) - .await - } -} +// Facilitate tracking when a token is burned +// #[derive(Clone, Debug)] +// pub struct NFTOwnershipV2 { +// pub token_data_id: String, +// pub owner_address: String, +// pub is_soulbound: Option, +// } +// +// /// Need a separate struct for queryable because we don't want to define the inserted_at column (letting DB fill) +// #[derive(Clone, Debug, Identifiable, Queryable)] +// #[diesel(primary_key(token_data_id, property_version_v1, owner_address, storage_id))] +// #[diesel(table_name = current_token_ownerships_v2)] +// pub struct CurrentTokenOwnershipV2Query { +// pub token_data_id: String, +// pub property_version_v1: BigDecimal, +// pub owner_address: String, +// pub storage_id: String, +// pub amount: BigDecimal, +// pub table_type_v1: Option, +// pub token_properties_mutated_v1: Option, +// pub is_soulbound_v2: Option, +// pub token_standard: String, +// pub is_fungible_v2: Option, +// pub last_transaction_version: i64, +// pub last_transaction_timestamp: chrono::NaiveDateTime, +// pub inserted_at: chrono::NaiveDateTime, +// pub non_transferrable_by_owner: Option, +// } + +// +// impl TokenOwnershipV2 { +// /// For nfts it's the same resources that we parse tokendatas from so we leverage the work done in there to get ownership data +// /// Vecs are returned because there could be multiple transfers in a single transaction and we need to document each one here. +// pub fn get_nft_v2_from_token_data( +// token_data: &TokenDataV2, +// object_metadatas: &ObjectAggregatedDataMapping, +// ) -> anyhow::Result<( +// Vec, +// AHashMap, +// )> { +// let mut ownerships = vec![]; +// let mut current_ownerships = AHashMap::new(); +// +// let object_data = object_metadatas +// .get(&token_data.token_data_id) +// .context("If token data exists objectcore must exist")?; +// let object_core = object_data.object.object_core.clone(); +// let token_data_id = token_data.token_data_id.clone(); +// let owner_address = object_core.get_owner_address(); +// let storage_id = token_data_id.clone(); +// +// // is_soulbound currently means if an object is completely untransferrable +// // OR if only admin can transfer. Only the former is true soulbound but +// // people might already be using it with the latter meaning so let's include both. +// let is_soulbound = if object_data.untransferable.as_ref().is_some() { +// true +// } else { +// !object_core.allow_ungated_transfer +// }; +// let non_transferrable_by_owner = !object_core.allow_ungated_transfer; +// +// ownerships.push(Self { +// transaction_version: token_data.transaction_version, +// write_set_change_index: token_data.write_set_change_index, +// token_data_id: token_data_id.clone(), +// property_version_v1: BigDecimal::zero(), +// owner_address: Some(owner_address.clone()), +// storage_id: storage_id.clone(), +// amount: BigDecimal::one(), +// table_type_v1: None, +// token_properties_mutated_v1: None, +// is_soulbound_v2: Some(is_soulbound), +// token_standard: TokenStandard::V2.to_string(), +// is_fungible_v2: None, +// transaction_timestamp: token_data.transaction_timestamp, +// non_transferrable_by_owner: Some(non_transferrable_by_owner), +// }); +// current_ownerships.insert( +// ( +// token_data_id.clone(), +// BigDecimal::zero(), +// owner_address.clone(), +// storage_id.clone(), +// ), +// CurrentTokenOwnershipV2 { +// token_data_id: token_data_id.clone(), +// property_version_v1: BigDecimal::zero(), +// owner_address, +// storage_id: storage_id.clone(), +// amount: BigDecimal::one(), +// table_type_v1: None, +// token_properties_mutated_v1: None, +// is_soulbound_v2: Some(is_soulbound), +// token_standard: TokenStandard::V2.to_string(), +// is_fungible_v2: None, +// last_transaction_version: token_data.transaction_version, +// last_transaction_timestamp: token_data.transaction_timestamp, +// non_transferrable_by_owner: Some(non_transferrable_by_owner), +// }, +// ); +// +// // check if token was transferred +// for (event_index, transfer_event) in &object_data.transfer_events { +// // If it's a self transfer then skip +// if transfer_event.get_to_address() == transfer_event.get_from_address() { +// continue; +// } +// ownerships.push(Self { +// transaction_version: token_data.transaction_version, +// // set to negative of event index to avoid collison with write set index +// write_set_change_index: -1 * event_index, +// token_data_id: token_data_id.clone(), +// property_version_v1: BigDecimal::zero(), +// // previous owner +// owner_address: Some(transfer_event.get_from_address()), +// storage_id: storage_id.clone(), +// // soft delete +// amount: BigDecimal::zero(), +// table_type_v1: None, +// token_properties_mutated_v1: None, +// is_soulbound_v2: Some(is_soulbound), +// token_standard: TokenStandard::V2.to_string(), +// is_fungible_v2: None, +// transaction_timestamp: token_data.transaction_timestamp, +// non_transferrable_by_owner: Some(is_soulbound), +// }); +// current_ownerships.insert( +// ( +// token_data_id.clone(), +// BigDecimal::zero(), +// transfer_event.get_from_address(), +// storage_id.clone(), +// ), +// CurrentTokenOwnershipV2 { +// token_data_id: token_data_id.clone(), +// property_version_v1: BigDecimal::zero(), +// // previous owner +// owner_address: transfer_event.get_from_address(), +// storage_id: storage_id.clone(), +// // soft delete +// amount: BigDecimal::zero(), +// table_type_v1: None, +// token_properties_mutated_v1: None, +// is_soulbound_v2: Some(is_soulbound), +// token_standard: TokenStandard::V2.to_string(), +// is_fungible_v2: None, +// last_transaction_version: token_data.transaction_version, +// last_transaction_timestamp: token_data.transaction_timestamp, +// non_transferrable_by_owner: Some(is_soulbound), +// }, +// ); +// } +// Ok((ownerships, current_ownerships)) +// } +// +// /// This handles the case where token is burned but objectCore is still there +// pub async fn get_burned_nft_v2_from_write_resource( +// write_resource: &WriteResource, +// txn_version: i64, +// write_set_change_index: i64, +// txn_timestamp: chrono::NaiveDateTime, +// prior_nft_ownership: &AHashMap, +// tokens_burned: &TokenV2Burned, +// object_metadatas: &ObjectAggregatedDataMapping, +// conn: &mut DbPoolConnection<'_>, +// query_retries: u32, +// query_retry_delay_ms: u64, +// ) -> anyhow::Result> { +// let token_data_id = standardize_address(&write_resource.address.to_string()); +// if tokens_burned +// .get(&standardize_address(&token_data_id)) +// .is_some() +// { +// if let Some(object) = &ObjectWithMetadata::from_write_resource(write_resource)? { +// let object_core = &object.object_core; +// let owner_address = object_core.get_owner_address(); +// let storage_id = token_data_id.clone(); +// +// // is_soulbound currently means if an object is completely untransferrable +// // OR if only admin can transfer. Only the former is true soulbound but +// // people might already be using it with the latter meaning so let's include both. +// let is_soulbound = if object_metadatas +// .get(&token_data_id) +// .map(|obj| obj.untransferable.as_ref()) +// .is_some() +// { +// true +// } else { +// !object_core.allow_ungated_transfer +// }; +// let non_transferrable_by_owner = !object_core.allow_ungated_transfer; +// +// return Ok(Some(( +// Self { +// transaction_version: txn_version, +// write_set_change_index, +// token_data_id: token_data_id.clone(), +// property_version_v1: BigDecimal::zero(), +// owner_address: Some(owner_address.clone()), +// storage_id: storage_id.clone(), +// amount: BigDecimal::zero(), +// table_type_v1: None, +// token_properties_mutated_v1: None, +// is_soulbound_v2: Some(is_soulbound), +// token_standard: TokenStandard::V2.to_string(), +// is_fungible_v2: Some(false), +// transaction_timestamp: txn_timestamp, +// non_transferrable_by_owner: Some(non_transferrable_by_owner), +// }, +// CurrentTokenOwnershipV2 { +// token_data_id, +// property_version_v1: BigDecimal::zero(), +// owner_address, +// storage_id, +// amount: BigDecimal::zero(), +// table_type_v1: None, +// token_properties_mutated_v1: None, +// is_soulbound_v2: Some(is_soulbound), +// token_standard: TokenStandard::V2.to_string(), +// is_fungible_v2: Some(false), +// last_transaction_version: txn_version, +// last_transaction_timestamp: txn_timestamp, +// non_transferrable_by_owner: Some(non_transferrable_by_owner), +// }, +// ))); +// } else { +// return Self::get_burned_nft_v2_helper( +// &token_data_id, +// txn_version, +// write_set_change_index, +// txn_timestamp, +// prior_nft_ownership, +// tokens_burned, +// conn, +// query_retries, +// query_retry_delay_ms, +// ) +// .await; +// } +// } +// Ok(None) +// } +// +// /// This handles the case where token is burned and objectCore is deleted +// pub async fn get_burned_nft_v2_from_delete_resource( +// delete_resource: &DeleteResource, +// txn_version: i64, +// write_set_change_index: i64, +// txn_timestamp: chrono::NaiveDateTime, +// prior_nft_ownership: &AHashMap, +// tokens_burned: &TokenV2Burned, +// conn: &mut DbPoolConnection<'_>, +// query_retries: u32, +// query_retry_delay_ms: u64, +// ) -> anyhow::Result> { +// let token_address = standardize_address(&delete_resource.address.to_string()); +// Self::get_burned_nft_v2_helper( +// &token_address, +// txn_version, +// write_set_change_index, +// txn_timestamp, +// prior_nft_ownership, +// tokens_burned, +// conn, +// query_retries, +// query_retry_delay_ms, +// ) +// .await +// } +// +// async fn get_burned_nft_v2_helper( +// token_address: &str, +// txn_version: i64, +// write_set_change_index: i64, +// txn_timestamp: chrono::NaiveDateTime, +// prior_nft_ownership: &AHashMap, +// tokens_burned: &TokenV2Burned, +// conn: &mut DbPoolConnection<'_>, +// query_retries: u32, +// query_retry_delay_ms: u64, +// ) -> anyhow::Result> { +// let token_address = standardize_address(token_address); +// if let Some(burn_event) = tokens_burned.get(&token_address) { +// // 1. Try to lookup token address in burn event mapping +// let previous_owner = if let Some(previous_owner) = +// burn_event.get_previous_owner_address() +// { +// previous_owner +// } else { +// // 2. If it doesn't exist in burn event mapping, then it must be an old burn event that doesn't contain previous_owner. +// // Do a lookup to get previous owner. This is necessary because previous owner is part of current token ownerships primary key. +// match prior_nft_ownership.get(&token_address) { +// Some(inner) => inner.owner_address.clone(), +// None => { +// match CurrentTokenOwnershipV2Query::get_latest_owned_nft_by_token_data_id( +// conn, +// &token_address, +// query_retries, +// query_retry_delay_ms, +// ) +// .await +// { +// Ok(nft) => nft.owner_address.clone(), +// Err(_) => { +// tracing::error!( +// transaction_version = txn_version, +// lookup_key = &token_address, +// "Failed to find current_token_ownership_v2 for burned token. You probably should backfill db." +// ); +// DEFAULT_OWNER_ADDRESS.to_string() +// }, +// } +// }, +// } +// }; +// +// let token_data_id = token_address.clone(); +// let storage_id = token_data_id.clone(); +// +// return Ok(Some(( +// Self { +// transaction_version: txn_version, +// write_set_change_index, +// token_data_id: token_data_id.clone(), +// property_version_v1: BigDecimal::zero(), +// owner_address: Some(previous_owner.clone()), +// storage_id: storage_id.clone(), +// amount: BigDecimal::zero(), +// table_type_v1: None, +// token_properties_mutated_v1: None, +// is_soulbound_v2: None, // default +// token_standard: TokenStandard::V2.to_string(), +// is_fungible_v2: None, // default +// transaction_timestamp: txn_timestamp, +// non_transferrable_by_owner: None, // default +// }, +// CurrentTokenOwnershipV2 { +// token_data_id, +// property_version_v1: BigDecimal::zero(), +// owner_address: previous_owner, +// storage_id, +// amount: BigDecimal::zero(), +// table_type_v1: None, +// token_properties_mutated_v1: None, +// is_soulbound_v2: None, // default +// token_standard: TokenStandard::V2.to_string(), +// is_fungible_v2: None, // default +// last_transaction_version: txn_version, +// last_transaction_timestamp: txn_timestamp, +// non_transferrable_by_owner: None, // default +// }, +// ))); +// } +// Ok(None) +// } +// +// /// We want to track tokens in any offer/claims and tokenstore +// pub fn get_v1_from_write_table_item( +// table_item: &WriteTableItem, +// txn_version: i64, +// write_set_change_index: i64, +// txn_timestamp: chrono::NaiveDateTime, +// table_handle_to_owner: &TableHandleToOwner, +// ) -> anyhow::Result)>> { +// let table_item_data = table_item.data.as_ref().unwrap(); +// +// let maybe_token = match TokenWriteSet::from_table_item_type( +// table_item_data.value_type.as_str(), +// &table_item_data.value, +// txn_version, +// )? { +// Some(TokenWriteSet::Token(inner)) => Some(inner), +// _ => None, +// }; +// +// if let Some(token) = maybe_token { +// let table_handle = standardize_address(&table_item.handle.to_string()); +// let amount = ensure_not_negative(token.amount); +// let token_id_struct = token.id; +// let token_data_id_struct = token_id_struct.token_data_id; +// let token_data_id = token_data_id_struct.to_id(); +// +// let maybe_table_metadata = table_handle_to_owner.get(&table_handle); +// let (curr_token_ownership, owner_address, table_type) = match maybe_table_metadata { +// Some(tm) => { +// if tm.table_type != "0x3::token::TokenStore" { +// return Ok(None); +// } +// let owner_address = tm.get_owner_address(); +// ( +// Some(CurrentTokenOwnershipV2 { +// token_data_id: token_data_id.clone(), +// property_version_v1: token_id_struct.property_version.clone(), +// owner_address: owner_address.clone(), +// storage_id: table_handle.clone(), +// amount: amount.clone(), +// table_type_v1: Some(tm.table_type.clone()), +// token_properties_mutated_v1: Some(token.token_properties.clone()), +// is_soulbound_v2: None, +// token_standard: TokenStandard::V1.to_string(), +// is_fungible_v2: None, +// last_transaction_version: txn_version, +// last_transaction_timestamp: txn_timestamp, +// non_transferrable_by_owner: None, +// }), +// Some(owner_address), +// Some(tm.table_type.clone()), +// ) +// }, +// None => (None, None, None), +// }; +// +// Ok(Some(( +// Self { +// transaction_version: txn_version, +// write_set_change_index, +// token_data_id, +// property_version_v1: token_id_struct.property_version, +// owner_address, +// storage_id: table_handle, +// amount, +// table_type_v1: table_type, +// token_properties_mutated_v1: Some(token.token_properties), +// is_soulbound_v2: None, +// token_standard: TokenStandard::V1.to_string(), +// is_fungible_v2: None, +// transaction_timestamp: txn_timestamp, +// non_transferrable_by_owner: None, +// }, +// curr_token_ownership, +// ))) +// } else { +// Ok(None) +// } +// } +// +// /// We want to track tokens in any offer/claims and tokenstore +// pub fn get_v1_from_delete_table_item( +// table_item: &DeleteTableItem, +// txn_version: i64, +// write_set_change_index: i64, +// txn_timestamp: chrono::NaiveDateTime, +// table_handle_to_owner: &TableHandleToOwner, +// ) -> anyhow::Result)>> { +// let table_item_data = table_item.data.as_ref().unwrap(); +// +// let maybe_token_id = match TokenWriteSet::from_table_item_type( +// table_item_data.key_type.as_str(), +// &table_item_data.key, +// txn_version, +// )? { +// Some(TokenWriteSet::TokenId(inner)) => Some(inner), +// _ => None, +// }; +// +// if let Some(token_id_struct) = maybe_token_id { +// let table_handle = standardize_address(&table_item.handle.to_string()); +// let token_data_id_struct = token_id_struct.token_data_id; +// let token_data_id = token_data_id_struct.to_id(); +// +// let maybe_table_metadata = table_handle_to_owner.get(&table_handle); +// let (curr_token_ownership, owner_address, table_type) = match maybe_table_metadata { +// Some(tm) => { +// if tm.table_type != "0x3::token::TokenStore" { +// return Ok(None); +// } +// let owner_address = tm.get_owner_address(); +// ( +// Some(CurrentTokenOwnershipV2 { +// token_data_id: token_data_id.clone(), +// property_version_v1: token_id_struct.property_version.clone(), +// owner_address: owner_address.clone(), +// storage_id: table_handle.clone(), +// amount: BigDecimal::zero(), +// table_type_v1: Some(tm.table_type.clone()), +// token_properties_mutated_v1: None, +// is_soulbound_v2: None, +// token_standard: TokenStandard::V1.to_string(), +// is_fungible_v2: None, +// last_transaction_version: txn_version, +// last_transaction_timestamp: txn_timestamp, +// non_transferrable_by_owner: None, +// }), +// Some(owner_address), +// Some(tm.table_type.clone()), +// ) +// }, +// None => (None, None, None), +// }; +// +// Ok(Some(( +// Self { +// transaction_version: txn_version, +// write_set_change_index, +// token_data_id, +// property_version_v1: token_id_struct.property_version, +// owner_address, +// storage_id: table_handle, +// amount: BigDecimal::zero(), +// table_type_v1: table_type, +// token_properties_mutated_v1: None, +// is_soulbound_v2: None, +// token_standard: TokenStandard::V1.to_string(), +// is_fungible_v2: None, +// transaction_timestamp: txn_timestamp, +// non_transferrable_by_owner: None, +// }, +// curr_token_ownership, +// ))) +// } else { +// Ok(None) +// } +// } +// } +// +// impl CurrentTokenOwnershipV2Query { +// pub async fn get_latest_owned_nft_by_token_data_id( +// conn: &mut DbPoolConnection<'_>, +// token_data_id: &str, +// query_retries: u32, +// query_retry_delay_ms: u64, +// ) -> anyhow::Result { +// let mut tried = 0; +// while tried < query_retries { +// tried += 1; +// match Self::get_latest_owned_nft_by_token_data_id_impl(conn, token_data_id).await { +// Ok(inner) => { +// return Ok(NFTOwnershipV2 { +// token_data_id: inner.token_data_id.clone(), +// owner_address: inner.owner_address.clone(), +// is_soulbound: inner.is_soulbound_v2, +// }); +// }, +// Err(_) => { +// if tried < query_retries { +// tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms)) +// .await; +// } +// }, +// } +// } +// Err(anyhow::anyhow!( +// "Failed to get nft by token data id: {}", +// token_data_id +// )) +// } +// +// async fn get_latest_owned_nft_by_token_data_id_impl( +// conn: &mut DbPoolConnection<'_>, +// token_data_id: &str, +// ) -> diesel::QueryResult { +// current_token_ownerships_v2::table +// .filter(current_token_ownerships_v2::token_data_id.eq(token_data_id)) +// .filter(current_token_ownerships_v2::amount.gt(BigDecimal::zero())) +// .first::(conn) +// .await +// } +// } diff --git a/rust/processor/src/db/postgres/models/token_v2_models/v2_token_utils.rs b/rust/processor/src/db/postgres/models/token_v2_models/v2_token_utils.rs index b6bab62bf..9e6e6ea40 100644 --- a/rust/processor/src/db/postgres/models/token_v2_models/v2_token_utils.rs +++ b/rust/processor/src/db/postgres/models/token_v2_models/v2_token_utils.rs @@ -1,433 +1,433 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -// This is required because a diesel macro makes clippy sad -#![allow(clippy::extra_unused_lifetimes)] - -use crate::{ - db::postgres::models::{ - object_models::v2_object_utils::CurrentObjectPK, - token_models::token_utils::{NAME_LENGTH, URI_LENGTH}, - }, - utils::util::{ - deserialize_from_string, deserialize_token_object_property_map_from_bcs_hexstring, - standardize_address, truncate_str, Aggregator, AggregatorSnapshot, DerivedStringSnapshot, - }, -}; -use ahash::{AHashMap, AHashSet}; -use anyhow::{Context, Result}; -use aptos_protos::transaction::v1::{Event, WriteResource}; -use bigdecimal::BigDecimal; -use lazy_static::lazy_static; -use serde::{Deserialize, Serialize}; -use std::fmt::{self, Formatter}; - -pub const DEFAULT_OWNER_ADDRESS: &str = "unknown"; - -lazy_static! { - pub static ref V2_STANDARD: String = TokenStandard::V2.to_string(); -} - -/// Tracks all token related data in a hashmap for quick access (keyed on address of the object core) -/// Maps address to burn event. If it's an old event previous_owner will be empty -pub type TokenV2Burned = AHashMap; -pub type TokenV2Minted = AHashSet; - -/// Tracks which token standard a token / collection is built upon -#[derive(Serialize)] -pub enum TokenStandard { - V1, - V2, -} - -impl fmt::Display for TokenStandard { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let res = match self { - TokenStandard::V1 => "v1", - TokenStandard::V2 => "v2", - }; - write!(f, "{}", res) - } -} - -/* Section on Collection / Token */ -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct Collection { - creator: String, - pub description: String, - // These are set to private because we should never get name or uri directly - name: String, - uri: String, -} - -impl Collection { - pub fn get_creator_address(&self) -> String { - standardize_address(&self.creator) - } - - pub fn get_uri_trunc(&self) -> String { - truncate_str(&self.uri, URI_LENGTH) - } - - pub fn get_name_trunc(&self) -> String { - truncate_str(&self.name, NAME_LENGTH) - } -} - -impl TryFrom<&WriteResource> for Collection { - type Error = anyhow::Error; - - fn try_from(write_resource: &WriteResource) -> anyhow::Result { - serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct AptosCollection { - pub mutable_description: bool, - pub mutable_uri: bool, -} - -impl TryFrom<&WriteResource> for AptosCollection { - type Error = anyhow::Error; - - fn try_from(write_resource: &WriteResource) -> anyhow::Result { - serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct TokenV2 { - collection: ResourceReference, - pub description: String, - // These are set to private because we should never get name or uri directly - name: String, - uri: String, -} - -impl TryFrom<&WriteResource> for TokenV2 { - type Error = anyhow::Error; - - fn try_from(write_resource: &WriteResource) -> anyhow::Result { - serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) - } -} - -impl TokenV2 { - pub fn get_collection_address(&self) -> String { - self.collection.get_reference_address() - } - - pub fn get_uri_trunc(&self) -> String { - truncate_str(&self.uri, URI_LENGTH) - } - - pub fn get_name_trunc(&self) -> String { - truncate_str(&self.name, NAME_LENGTH) - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct ResourceReference { - inner: String, -} - -impl ResourceReference { - pub fn get_reference_address(&self) -> String { - standardize_address(&self.inner) - } -} - -/* Section on Supply */ -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct FixedSupply { - #[serde(deserialize_with = "deserialize_from_string")] - pub current_supply: BigDecimal, - #[serde(deserialize_with = "deserialize_from_string")] - pub max_supply: BigDecimal, - #[serde(deserialize_with = "deserialize_from_string")] - pub total_minted: BigDecimal, -} - -impl TryFrom<&WriteResource> for FixedSupply { - type Error = anyhow::Error; - - fn try_from(write_resource: &WriteResource) -> anyhow::Result { - serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct UnlimitedSupply { - #[serde(deserialize_with = "deserialize_from_string")] - pub current_supply: BigDecimal, - #[serde(deserialize_with = "deserialize_from_string")] - pub total_minted: BigDecimal, -} - -impl TryFrom<&WriteResource> for UnlimitedSupply { - type Error = anyhow::Error; - - fn try_from(write_resource: &WriteResource) -> anyhow::Result { - serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct ConcurrentSupply { - pub current_supply: Aggregator, - pub total_minted: Aggregator, -} - -impl TryFrom<&WriteResource> for ConcurrentSupply { - type Error = anyhow::Error; - - fn try_from(write_resource: &WriteResource) -> anyhow::Result { - serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) - } -} - -/* Section on Events */ -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct MintEvent { - #[serde(deserialize_with = "deserialize_from_string")] - pub index: BigDecimal, - token: String, -} - -impl MintEvent { - pub fn from_event(event: &Event, txn_version: i64) -> anyhow::Result> { - if let Some(V2TokenEvent::MintEvent(inner)) = - V2TokenEvent::from_event(event.type_str.as_str(), &event.data, txn_version).unwrap() - { - Ok(Some(inner)) - } else { - Ok(None) - } - } - - pub fn get_token_address(&self) -> String { - standardize_address(&self.token) - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct Mint { - collection: String, - pub index: AggregatorSnapshot, - token: String, -} - -impl Mint { - pub fn from_event(event: &Event, txn_version: i64) -> anyhow::Result> { - if let Some(V2TokenEvent::Mint(inner)) = - V2TokenEvent::from_event(event.type_str.as_str(), &event.data, txn_version).unwrap() - { - Ok(Some(inner)) - } else { - Ok(None) - } - } - - pub fn get_token_address(&self) -> String { - standardize_address(&self.token) - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct TokenMutationEvent { - pub mutated_field_name: String, - pub old_value: String, - pub new_value: String, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct TokenMutationEventV2 { - pub token_address: String, - pub mutated_field_name: String, - pub old_value: String, - pub new_value: String, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct BurnEvent { - #[serde(deserialize_with = "deserialize_from_string")] - pub index: BigDecimal, - token: String, -} - -impl BurnEvent { - pub fn from_event(event: &Event, txn_version: i64) -> anyhow::Result> { - if let Some(V2TokenEvent::BurnEvent(inner)) = - V2TokenEvent::from_event(event.type_str.as_str(), &event.data, txn_version).unwrap() - { - Ok(Some(inner)) - } else { - Ok(None) - } - } - - pub fn get_token_address(&self) -> String { - standardize_address(&self.token) - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct Burn { - collection: String, - #[serde(deserialize_with = "deserialize_from_string")] - pub index: BigDecimal, - token: String, - previous_owner: String, -} - -impl Burn { - pub fn new( - collection: String, - index: BigDecimal, - token: String, - previous_owner: String, - ) -> Self { - Burn { - collection, - index, - token, - previous_owner, - } - } - - pub fn from_event(event: &Event, txn_version: i64) -> anyhow::Result> { - if let Some(V2TokenEvent::Burn(inner)) = - V2TokenEvent::from_event(event.type_str.as_str(), &event.data, txn_version).unwrap() - { - Ok(Some(inner)) - } else { - Ok(None) - } - } - - pub fn get_token_address(&self) -> String { - standardize_address(&self.token) - } - - pub fn get_previous_owner_address(&self) -> Option { - if self.previous_owner.is_empty() { - None - } else { - Some(standardize_address(&self.previous_owner)) - } - } - - pub fn get_collection_address(&self) -> String { - standardize_address(&self.collection) - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct TransferEvent { - from: String, - to: String, - object: String, -} - -impl TransferEvent { - pub fn from_event(event: &Event, txn_version: i64) -> anyhow::Result> { - if let Some(V2TokenEvent::TransferEvent(inner)) = - V2TokenEvent::from_event(event.type_str.as_str(), &event.data, txn_version).unwrap() - { - Ok(Some(inner)) - } else { - Ok(None) - } - } - - pub fn get_from_address(&self) -> String { - standardize_address(&self.from) - } - - pub fn get_to_address(&self) -> String { - standardize_address(&self.to) - } - - pub fn get_object_address(&self) -> String { - standardize_address(&self.object) - } -} - -/* Section on Property Maps */ -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct PropertyMapModel { - #[serde(deserialize_with = "deserialize_token_object_property_map_from_bcs_hexstring")] - pub inner: serde_json::Value, -} - -impl TryFrom<&WriteResource> for PropertyMapModel { - type Error = anyhow::Error; - - fn try_from(write_resource: &WriteResource) -> anyhow::Result { - serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct TokenIdentifiers { - name: DerivedStringSnapshot, -} - -impl TryFrom<&WriteResource> for TokenIdentifiers { - type Error = anyhow::Error; - - fn try_from(write_resource: &WriteResource) -> anyhow::Result { - serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) - } -} - -impl TokenIdentifiers { - pub fn get_name_trunc(&self) -> String { - truncate_str(&self.name.value, NAME_LENGTH) - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum V2TokenEvent { - Mint(Mint), - MintEvent(MintEvent), - TokenMutationEvent(TokenMutationEvent), - TokenMutation(TokenMutationEventV2), - Burn(Burn), - BurnEvent(BurnEvent), - TransferEvent(TransferEvent), -} - -impl V2TokenEvent { - pub fn from_event(data_type: &str, data: &str, txn_version: i64) -> Result> { - match data_type { - "0x4::collection::Mint" => { - serde_json::from_str(data).map(|inner| Some(Self::Mint(inner))) - }, - "0x4::collection::MintEvent" => { - serde_json::from_str(data).map(|inner| Some(Self::MintEvent(inner))) - }, - "0x4::token::MutationEvent" => { - serde_json::from_str(data).map(|inner| Some(Self::TokenMutationEvent(inner))) - }, - "0x4::token::Mutation" => { - serde_json::from_str(data).map(|inner| Some(Self::TokenMutation(inner))) - }, - "0x4::collection::Burn" => { - serde_json::from_str(data).map(|inner| Some(Self::Burn(inner))) - }, - "0x4::collection::BurnEvent" => { - serde_json::from_str(data).map(|inner| Some(Self::BurnEvent(inner))) - }, - "0x1::object::TransferEvent" | "0x1::object::Transfer" => { - serde_json::from_str(data).map(|inner| Some(Self::TransferEvent(inner))) - }, - _ => Ok(None), - } - .context(format!( - "version {} failed! failed to parse type {}, data {:?}", - txn_version, data_type, data - )) - } -} +// // Copyright © Aptos Foundation +// // SPDX-License-Identifier: Apache-2.0 +// +// // This is required because a diesel macro makes clippy sad +// #![allow(clippy::extra_unused_lifetimes)] +// +// use crate::{ +// db::postgres::models::{ +// object_models::v2_object_utils::CurrentObjectPK, +// token_models::token_utils::{NAME_LENGTH, URI_LENGTH}, +// }, +// utils::util::{ +// deserialize_from_string, deserialize_token_object_property_map_from_bcs_hexstring, +// standardize_address, truncate_str, Aggregator, AggregatorSnapshot, DerivedStringSnapshot, +// }, +// }; +// use ahash::{AHashMap, AHashSet}; +// use anyhow::{Context, Result}; +// use aptos_protos::transaction::v1::{Event, WriteResource}; +// use bigdecimal::BigDecimal; +// use lazy_static::lazy_static; +// use serde::{Deserialize, Serialize}; +// use std::fmt::{self, Formatter}; +// +// pub const DEFAULT_OWNER_ADDRESS: &str = "unknown"; +// +// lazy_static! { +// pub static ref V2_STANDARD: String = TokenStandard::V2.to_string(); +// } +// +// /// Tracks all token related data in a hashmap for quick access (keyed on address of the object core) +// /// Maps address to burn event. If it's an old event previous_owner will be empty +// pub type TokenV2Burned = AHashMap; +// pub type TokenV2Minted = AHashSet; +// +// /// Tracks which token standard a token / collection is built upon +// #[derive(Serialize)] +// pub enum TokenStandard { +// V1, +// V2, +// } +// +// impl fmt::Display for TokenStandard { +// fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { +// let res = match self { +// TokenStandard::V1 => "v1", +// TokenStandard::V2 => "v2", +// }; +// write!(f, "{}", res) +// } +// } +// +// /* Section on Collection / Token */ +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct Collection { +// creator: String, +// pub description: String, +// // These are set to private because we should never get name or uri directly +// name: String, +// uri: String, +// } +// +// impl Collection { +// pub fn get_creator_address(&self) -> String { +// standardize_address(&self.creator) +// } +// +// pub fn get_uri_trunc(&self) -> String { +// truncate_str(&self.uri, URI_LENGTH) +// } +// +// pub fn get_name_trunc(&self) -> String { +// truncate_str(&self.name, NAME_LENGTH) +// } +// } +// +// impl TryFrom<&WriteResource> for Collection { +// type Error = anyhow::Error; +// +// fn try_from(write_resource: &WriteResource) -> anyhow::Result { +// serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) +// } +// } +// +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct AptosCollection { +// pub mutable_description: bool, +// pub mutable_uri: bool, +// } +// +// impl TryFrom<&WriteResource> for AptosCollection { +// type Error = anyhow::Error; +// +// fn try_from(write_resource: &WriteResource) -> anyhow::Result { +// serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) +// } +// } +// +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct TokenV2 { +// collection: ResourceReference, +// pub description: String, +// // These are set to private because we should never get name or uri directly +// name: String, +// uri: String, +// } +// +// impl TryFrom<&WriteResource> for TokenV2 { +// type Error = anyhow::Error; +// +// fn try_from(write_resource: &WriteResource) -> anyhow::Result { +// serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) +// } +// } +// +// impl TokenV2 { +// pub fn get_collection_address(&self) -> String { +// self.collection.get_reference_address() +// } +// +// pub fn get_uri_trunc(&self) -> String { +// truncate_str(&self.uri, URI_LENGTH) +// } +// +// pub fn get_name_trunc(&self) -> String { +// truncate_str(&self.name, NAME_LENGTH) +// } +// } +// +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct ResourceReference { +// inner: String, +// } +// +// impl ResourceReference { +// pub fn get_reference_address(&self) -> String { +// standardize_address(&self.inner) +// } +// } +// +// /* Section on Supply */ +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct FixedSupply { +// #[serde(deserialize_with = "deserialize_from_string")] +// pub current_supply: BigDecimal, +// #[serde(deserialize_with = "deserialize_from_string")] +// pub max_supply: BigDecimal, +// #[serde(deserialize_with = "deserialize_from_string")] +// pub total_minted: BigDecimal, +// } +// +// impl TryFrom<&WriteResource> for FixedSupply { +// type Error = anyhow::Error; +// +// fn try_from(write_resource: &WriteResource) -> anyhow::Result { +// serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) +// } +// } +// +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct UnlimitedSupply { +// #[serde(deserialize_with = "deserialize_from_string")] +// pub current_supply: BigDecimal, +// #[serde(deserialize_with = "deserialize_from_string")] +// pub total_minted: BigDecimal, +// } +// +// impl TryFrom<&WriteResource> for UnlimitedSupply { +// type Error = anyhow::Error; +// +// fn try_from(write_resource: &WriteResource) -> anyhow::Result { +// serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) +// } +// } +// +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct ConcurrentSupply { +// pub current_supply: Aggregator, +// pub total_minted: Aggregator, +// } +// +// impl TryFrom<&WriteResource> for ConcurrentSupply { +// type Error = anyhow::Error; +// +// fn try_from(write_resource: &WriteResource) -> anyhow::Result { +// serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) +// } +// } +// +// /* Section on Events */ +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct MintEvent { +// #[serde(deserialize_with = "deserialize_from_string")] +// pub index: BigDecimal, +// token: String, +// } +// +// impl MintEvent { +// pub fn from_event(event: &Event, txn_version: i64) -> anyhow::Result> { +// if let Some(V2TokenEvent::MintEvent(inner)) = +// V2TokenEvent::from_event(event.type_str.as_str(), &event.data, txn_version).unwrap() +// { +// Ok(Some(inner)) +// } else { +// Ok(None) +// } +// } +// +// pub fn get_token_address(&self) -> String { +// standardize_address(&self.token) +// } +// } +// +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct Mint { +// collection: String, +// pub index: AggregatorSnapshot, +// token: String, +// } +// +// impl Mint { +// pub fn from_event(event: &Event, txn_version: i64) -> anyhow::Result> { +// if let Some(V2TokenEvent::Mint(inner)) = +// V2TokenEvent::from_event(event.type_str.as_str(), &event.data, txn_version).unwrap() +// { +// Ok(Some(inner)) +// } else { +// Ok(None) +// } +// } +// +// pub fn get_token_address(&self) -> String { +// standardize_address(&self.token) +// } +// } +// +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct TokenMutationEvent { +// pub mutated_field_name: String, +// pub old_value: String, +// pub new_value: String, +// } +// +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct TokenMutationEventV2 { +// pub token_address: String, +// pub mutated_field_name: String, +// pub old_value: String, +// pub new_value: String, +// } +// +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct BurnEvent { +// #[serde(deserialize_with = "deserialize_from_string")] +// pub index: BigDecimal, +// token: String, +// } +// +// impl BurnEvent { +// pub fn from_event(event: &Event, txn_version: i64) -> anyhow::Result> { +// if let Some(V2TokenEvent::BurnEvent(inner)) = +// V2TokenEvent::from_event(event.type_str.as_str(), &event.data, txn_version).unwrap() +// { +// Ok(Some(inner)) +// } else { +// Ok(None) +// } +// } +// +// pub fn get_token_address(&self) -> String { +// standardize_address(&self.token) +// } +// } +// +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct Burn { +// collection: String, +// #[serde(deserialize_with = "deserialize_from_string")] +// pub index: BigDecimal, +// token: String, +// previous_owner: String, +// } +// +// impl Burn { +// pub fn new( +// collection: String, +// index: BigDecimal, +// token: String, +// previous_owner: String, +// ) -> Self { +// Burn { +// collection, +// index, +// token, +// previous_owner, +// } +// } +// +// pub fn from_event(event: &Event, txn_version: i64) -> anyhow::Result> { +// if let Some(V2TokenEvent::Burn(inner)) = +// V2TokenEvent::from_event(event.type_str.as_str(), &event.data, txn_version).unwrap() +// { +// Ok(Some(inner)) +// } else { +// Ok(None) +// } +// } +// +// pub fn get_token_address(&self) -> String { +// standardize_address(&self.token) +// } +// +// pub fn get_previous_owner_address(&self) -> Option { +// if self.previous_owner.is_empty() { +// None +// } else { +// Some(standardize_address(&self.previous_owner)) +// } +// } +// +// pub fn get_collection_address(&self) -> String { +// standardize_address(&self.collection) +// } +// } +// +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct TransferEvent { +// from: String, +// to: String, +// object: String, +// } +// +// impl TransferEvent { +// pub fn from_event(event: &Event, txn_version: i64) -> anyhow::Result> { +// if let Some(V2TokenEvent::TransferEvent(inner)) = +// V2TokenEvent::from_event(event.type_str.as_str(), &event.data, txn_version).unwrap() +// { +// Ok(Some(inner)) +// } else { +// Ok(None) +// } +// } +// +// pub fn get_from_address(&self) -> String { +// standardize_address(&self.from) +// } +// +// pub fn get_to_address(&self) -> String { +// standardize_address(&self.to) +// } +// +// pub fn get_object_address(&self) -> String { +// standardize_address(&self.object) +// } +// } +// +// /* Section on Property Maps */ +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct PropertyMapModel { +// #[serde(deserialize_with = "deserialize_token_object_property_map_from_bcs_hexstring")] +// pub inner: serde_json::Value, +// } +// +// impl TryFrom<&WriteResource> for PropertyMapModel { +// type Error = anyhow::Error; +// +// fn try_from(write_resource: &WriteResource) -> anyhow::Result { +// serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) +// } +// } +// +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub struct TokenIdentifiers { +// name: DerivedStringSnapshot, +// } +// +// impl TryFrom<&WriteResource> for TokenIdentifiers { +// type Error = anyhow::Error; +// +// fn try_from(write_resource: &WriteResource) -> anyhow::Result { +// serde_json::from_str(write_resource.data.as_str()).map_err(anyhow::Error::msg) +// } +// } +// +// impl TokenIdentifiers { +// pub fn get_name_trunc(&self) -> String { +// truncate_str(&self.name.value, NAME_LENGTH) +// } +// } +// +// #[derive(Serialize, Deserialize, Debug, Clone)] +// pub enum V2TokenEvent { +// Mint(Mint), +// MintEvent(MintEvent), +// TokenMutationEvent(TokenMutationEvent), +// TokenMutation(TokenMutationEventV2), +// Burn(Burn), +// BurnEvent(BurnEvent), +// TransferEvent(TransferEvent), +// } +// +// impl V2TokenEvent { +// pub fn from_event(data_type: &str, data: &str, txn_version: i64) -> Result> { +// match data_type { +// "0x4::collection::Mint" => { +// serde_json::from_str(data).map(|inner| Some(Self::Mint(inner))) +// }, +// "0x4::collection::MintEvent" => { +// serde_json::from_str(data).map(|inner| Some(Self::MintEvent(inner))) +// }, +// "0x4::token::MutationEvent" => { +// serde_json::from_str(data).map(|inner| Some(Self::TokenMutationEvent(inner))) +// }, +// "0x4::token::Mutation" => { +// serde_json::from_str(data).map(|inner| Some(Self::TokenMutation(inner))) +// }, +// "0x4::collection::Burn" => { +// serde_json::from_str(data).map(|inner| Some(Self::Burn(inner))) +// }, +// "0x4::collection::BurnEvent" => { +// serde_json::from_str(data).map(|inner| Some(Self::BurnEvent(inner))) +// }, +// "0x1::object::TransferEvent" | "0x1::object::Transfer" => { +// serde_json::from_str(data).map(|inner| Some(Self::TransferEvent(inner))) +// }, +// _ => Ok(None), +// } +// .context(format!( +// "version {} failed! failed to parse type {}, data {:?}", +// txn_version, data_type, data +// )) +// } +// } diff --git a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs index a12d214f1..dc5a6b5ef 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_token_v2_processor.rs @@ -12,7 +12,7 @@ use crate::{ RawTokenDataV2, TokenDataV2Convertible, }, parquet::models::token_v2_models::{ - v2_token_datas::TokenDataV2, v2_token_ownerships::TokenOwnershipV2, + v2_token_datas::TokenDataV2, }, postgres::models::{ fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata, @@ -21,12 +21,6 @@ use crate::{ }, resources::{FromWriteResource, V2TokenResource}, token_models::tokens::{TableHandleToOwner, TableMetadataForToken}, - token_v2_models::{ - v2_token_ownerships::NFTOwnershipV2, - v2_token_utils::{ - Burn, BurnEvent, MintEvent, TokenV2Burned, TokenV2Minted, TransferEvent, - }, - }, }, }, gap_detectors::ProcessingResult, @@ -44,6 +38,15 @@ use async_trait::async_trait; use kanal::AsyncSender; use serde::{Deserialize, Serialize}; use std::{fmt::Debug, time::Duration}; +use crate::db::parquet::models::token_v2_models::v2_token_ownerships::TokenOwnershipV2; +use crate::db::common::models::token_v2_models::v2_token_utils::TokenV2Burned; +use crate::db::common::models::token_v2_models::v2_token_utils::TokenV2Minted; +use crate::db::common::models::token_v2_models::v2_token_utils::Burn; +use crate::db::common::models::token_v2_models::v2_token_utils::BurnEvent; +use crate::db::common::models::token_v2_models::v2_token_utils::MintEvent; +use crate::db::common::models::token_v2_models::v2_token_utils::TransferEvent; +use crate::db::common::models::token_v2_models::raw_v2_token_ownerships::NFTOwnershipV2; + #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] @@ -154,6 +157,11 @@ impl ProcessorTrait for ParquetTokenV2Processor { .await .context("Failed to send token data v2 parquet data")?; + // let parquet_token_ownerships_v2: Vec = raw_token_ownerships_v2 + // .into_iter() + // .map(TokenOwnershipV2::from_raw) + // .collect(); + let token_ownerships_v2_parquet_data = ParquetDataGeneric { data: token_ownerships_v2, }; diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index 37e0c8b4b..fa6c48280 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -33,12 +33,9 @@ use crate::{ v2_token_datas::{CurrentTokenDataV2, CurrentTokenDataV2PK, TokenDataV2}, v2_token_metadata::{CurrentTokenV2Metadata, CurrentTokenV2MetadataPK}, v2_token_ownerships::{ - CurrentTokenOwnershipV2, CurrentTokenOwnershipV2PK, NFTOwnershipV2, + CurrentTokenOwnershipV2, TokenOwnershipV2, }, - v2_token_utils::{ - Burn, BurnEvent, Mint, MintEvent, TokenV2Burned, TokenV2Minted, TransferEvent, - }, }, }, }, @@ -65,6 +62,21 @@ use serde::{Deserialize, Serialize}; use std::fmt::Debug; use tracing::error; +use crate::db::common::models::token_v2_models::v2_token_utils::TokenV2Burned; +use crate::db::common::models::token_v2_models::v2_token_utils::TokenV2Minted; +use crate::db::common::models::token_v2_models::v2_token_utils::Burn; +use crate::db::common::models::token_v2_models::v2_token_utils::Mint; +use crate::db::common::models::token_v2_models::v2_token_utils::BurnEvent; +use crate::db::common::models::token_v2_models::v2_token_utils::TransferEvent; +use crate::db::common::models::token_v2_models::raw_v2_token_ownerships::RawTokenOwnershipV2; +use crate::db::common::models::token_v2_models::raw_v2_token_ownerships::RawCurrentTokenOwnershipV2; +use crate::db::common::models::token_v2_models::v2_token_utils::MintEvent; +use crate::db::common::models::token_v2_models::raw_v2_token_ownerships::NFTOwnershipV2; +use crate::db::common::models::token_v2_models::raw_v2_token_ownerships::CurrentTokenOwnershipV2PK; +use crate::db::common::models::token_v2_models::raw_v2_token_ownerships::CurrentTokenOwnershipV2Convertible; +use crate::db::common::models::token_v2_models::raw_v2_token_ownerships::TokenOwnershipV2Convertible; + + #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] pub struct TokenV2ProcessorConfig { @@ -618,12 +630,12 @@ impl ProcessorTrait for TokenV2Processor { let ( mut collections_v2, raw_token_datas_v2, - mut token_ownerships_v2, + raw_token_ownerships_v2, current_collections_v2, raw_current_token_datas_v2, raw_current_deleted_token_datas_v2, - current_token_ownerships_v2, - current_deleted_token_ownerships_v2, + raw_current_token_ownerships_v2, + raw_current_deleted_token_ownerships_v2, raw_token_activities_v2, raw_current_token_v2_metadata, raw_current_token_royalties_v1, @@ -675,6 +687,25 @@ impl ProcessorTrait for TokenV2Processor { .map(CurrentTokenDataV2::from_raw) .collect(); + let mut postgres_token_ownerships_v2: Vec = raw_token_ownerships_v2 + .into_iter() + .map(TokenOwnershipV2::from_raw) + .collect(); + + let postgres_current_token_ownerships_v2: Vec = + raw_current_token_ownerships_v2 + .into_iter() + .map(CurrentTokenOwnershipV2::from_raw) + .collect(); + + let postgres_current_deleted_token_ownerships_v2: Vec = + raw_current_deleted_token_ownerships_v2 + .into_iter() + .map(CurrentTokenOwnershipV2::from_raw) + .collect(); + + + let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -682,7 +713,7 @@ impl ProcessorTrait for TokenV2Processor { .deprecated_tables .contains(TableFlags::TOKEN_OWNERSHIPS_V2) { - token_ownerships_v2.clear(); + postgres_token_ownerships_v2.clear(); } if self.deprecated_tables.contains(TableFlags::TOKEN_DATAS_V2) { postgres_token_datas_v2.clear(); @@ -704,15 +735,15 @@ impl ProcessorTrait for TokenV2Processor { end_version, &collections_v2, &postgres_token_datas_v2, - &token_ownerships_v2, + &postgres_token_ownerships_v2, ¤t_collections_v2, ( &postgres_current_token_datas_v2, &postgres_current_deleted_token_datas_v2, ), ( - ¤t_token_ownerships_v2, - ¤t_deleted_token_ownerships_v2, + &postgres_current_token_ownerships_v2, + &postgres_current_deleted_token_ownerships_v2, ), &postgres_token_activities_v2, &postgres_current_token_v2_metadata, @@ -760,12 +791,12 @@ pub async fn parse_v2_token( ) -> ( Vec, Vec, - Vec, + Vec, Vec, Vec, Vec, - Vec, - Vec, // deleted token ownerships + Vec, + Vec, // deleted token ownerships Vec, Vec, Vec, @@ -785,7 +816,7 @@ pub async fn parse_v2_token( AHashMap::new(); let mut current_token_ownerships_v2: AHashMap< CurrentTokenOwnershipV2PK, - CurrentTokenOwnershipV2, + RawCurrentTokenOwnershipV2, > = AHashMap::new(); let mut current_deleted_token_ownerships_v2 = AHashMap::new(); // Optimization to track prior ownership in case a token gets burned so we can lookup the ownership @@ -1027,7 +1058,7 @@ pub async fn parse_v2_token( ); } if let Some((token_ownership, current_token_ownership)) = - TokenOwnershipV2::get_v1_from_write_table_item( + RawTokenOwnershipV2::get_v1_from_write_table_item( table_item, txn_version, wsc_index, @@ -1079,7 +1110,7 @@ pub async fn parse_v2_token( }, Change::DeleteTableItem(table_item) => { if let Some((token_ownership, current_token_ownership)) = - TokenOwnershipV2::get_v1_from_delete_table_item( + RawTokenOwnershipV2::get_v1_from_delete_table_item( table_item, txn_version, wsc_index, @@ -1159,8 +1190,8 @@ pub async fn parse_v2_token( { // Add NFT ownership let (mut ownerships, current_ownerships) = - TokenOwnershipV2::get_nft_v2_from_token_data( - &TokenDataV2::from_raw(raw_token_data.clone()), + RawTokenOwnershipV2::get_nft_v2_from_token_data( + &raw_token_data, &token_v2_metadata_helper, ) .unwrap(); @@ -1210,7 +1241,7 @@ pub async fn parse_v2_token( // Add burned NFT handling // This handles the case where token is burned but objectCore is still there if let Some((nft_ownership, current_nft_ownership)) = - TokenOwnershipV2::get_burned_nft_v2_from_write_resource( + RawTokenOwnershipV2::get_burned_nft_v2_from_write_resource( resource, txn_version, wsc_index, @@ -1282,7 +1313,7 @@ pub async fn parse_v2_token( ); } if let Some((nft_ownership, current_nft_ownership)) = - TokenOwnershipV2::get_burned_nft_v2_from_delete_resource( + RawTokenOwnershipV2::get_burned_nft_v2_from_delete_resource( resource, txn_version, wsc_index, @@ -1334,13 +1365,13 @@ pub async fn parse_v2_token( .collect::>(); let mut current_token_ownerships_v2 = current_token_ownerships_v2 .into_values() - .collect::>(); + .collect::>(); let mut current_token_v2_metadata = current_token_v2_metadata .into_values() .collect::>(); let mut current_deleted_token_ownerships_v2 = current_deleted_token_ownerships_v2 .into_values() - .collect::>(); + .collect::>(); let mut current_token_royalties_v1 = current_token_royalties_v1 .into_values() .collect::>(); diff --git a/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs b/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs index c34d6ba9e..41900dd4f 100644 --- a/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs +++ b/rust/sdk-processor/src/steps/token_v2_processor/token_v2_extractor.rs @@ -14,6 +14,9 @@ use processor::{ raw_v2_token_activities::TokenActivityV2Convertible, raw_v2_token_datas::{CurrentTokenDataV2Convertible, TokenDataV2Convertible}, raw_v2_token_metadata::CurrentTokenV2MetadataConvertible, + raw_v2_token_ownerships::{ + CurrentTokenOwnershipV2Convertible, TokenOwnershipV2Convertible, + }, }, postgres::models::{ token_models::{token_claims::CurrentTokenPendingClaim, tokens::TableMetadataForToken}, @@ -108,12 +111,12 @@ impl Processable for TokenV2Extractor { let ( collections_v2, raw_token_datas_v2, - token_ownerships_v2, + raw_token_ownerships_v2, current_collections_v2, raw_current_token_datas_v2, raw_current_deleted_token_datas_v2, - current_token_ownerships_v2, - current_deleted_token_ownerships_v2, + raw_current_token_ownerships_v2, + raw_current_deleted_token_ownerships_v2, raw_token_activities_v2, raw_current_token_v2_metadata, raw_current_token_royalties_v1, @@ -165,16 +168,33 @@ impl Processable for TokenV2Extractor { .map(CurrentTokenDataV2::from_raw) .collect(); + let postgres_token_ownerships_v2: Vec = raw_token_ownerships_v2 + .into_iter() + .map(TokenOwnershipV2::from_raw) + .collect(); + + let postgres_current_token_ownerships_v2: Vec = + raw_current_token_ownerships_v2 + .into_iter() + .map(CurrentTokenOwnershipV2::from_raw) + .collect(); + + let postgres_current_deleted_token_ownerships_v2: Vec = + raw_current_deleted_token_ownerships_v2 + .into_iter() + .map(CurrentTokenOwnershipV2::from_raw) + .collect(); + Ok(Some(TransactionContext { data: ( collections_v2, postgres_token_datas_v2, - token_ownerships_v2, + postgres_token_ownerships_v2, current_collections_v2, postgres_current_token_datas_v2, postgress_current_deleted_token_datas_v2, - current_token_ownerships_v2, - current_deleted_token_ownerships_v2, + postgres_current_token_ownerships_v2, + postgres_current_deleted_token_ownerships_v2, postgres_token_activities_v2, postgres_current_token_v2_metadata, postgres_current_token_royalties_v1,